1 /* Copyright (C) 2014 InfiniDB, Inc.
2  * Copyright (C) 2016 MariaDB Corporation.
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 /*******************************************************************************
20 * $Id$
21 *
22 *******************************************************************************/
23 
24 /*
25  * we_splclient.cpp
26  *
27  *  Created on: Oct 20, 2011
28  *      Author: bpaul
29  */
30 
31 #include <cstdio>
32 #include <iostream>
33 #include <string>
34 using namespace std;
35 
36 #include "errorids.h"
37 #include "exceptclasses.h"
38 #include "messagelog.h"
39 #include "messageobj.h"
40 #include "loggingid.h"
41 using namespace logging;
42 
43 #include <boost/thread/mutex.hpp>
44 using namespace boost;
45 
46 #include "messagequeue.h"
47 #include "bytestream.h"
48 using namespace messageqcpp;
49 
50 #include "liboamcpp.h"
51 using namespace oam;
52 
53 #include "we_sdhandler.h"
54 #include "we_splclient.h"
55 
56 namespace WriteEngine
57 {
58 
59 //------------------------------------------------------------------------------
60 //BP 10/24/2011 14:25
61 //------------------------------------------------------------------------------
operator ()()62 void WESplClientRunner::operator()()
63 {
64     fOwner.sendAndRecv();
65 }
66 
67 //------------------------------------------------------------------------------
68 //BP 10/24/2011 14:25
69 //------------------------------------------------------------------------------
70 
71 
WESplClient(WESDHandler & Sdh,int PmId)72 WESplClient::WESplClient(WESDHandler& Sdh, int PmId):
73     fContinue(true),
74     fConnected(false),
75     fPmId(PmId),
76     fDbrCnt(0),
77     fDbrVar(0),
78     fDataRqstCnt(0),
79     fRdSecTo(0),
80     fRowTx(0),
81     fBytesTx(0),
82     fBytesRcv(0),
83     fLastInTime(0),
84     fStartTime(time(0)),
85     fSend(true),
86     fCpiStarted(false),
87     fCpiPassed(false),
88     fCpiFailed(false),
89     fBrmRptRcvd(false),
90     fRollbackRslt(0),
91     fCleanupRslt(0),
92     fServer(),
93     fClnt(),
94     fpThread(0),
95     fOwner(Sdh)
96 {
97     // TODO ctor
98 }
99 //------------------------------------------------------------------------------
~WESplClient()100 WESplClient::~WESplClient()
101 {
102     delete fpThread;
103     fpThread = 0;
104 }
105 //------------------------------------------------------------------------------
setup()106 void WESplClient::setup()
107 {
108     // do the setup stuffs here
109     if (fOwner.getDebugLvl())
110         cout << "setting connection to moduleid " << getPmId() << endl;
111 
112     char buff[32];
113     snprintf(buff, sizeof(buff), "pm%u_WriteEngineServer", getPmId());
114     fServer = buff;
115 
116     fClnt.reset(new MessageQueueClient(fServer));
117 
118     if (fOwner.getDebugLvl())
119         cout << "WEServer : " << fServer << " " << fClnt->addr2String() << endl;
120 
121     try
122     {
123         if (fClnt->connect())
124         {
125             onConnect();
126             startClientThread();
127         }
128         else
129         {
130             throw runtime_error("Connection refused");
131         }
132     }
133     catch (std::exception& ex)
134     {
135         cerr << "Could not connect to " << fServer << ": " << ex.what() << endl;
136         throw runtime_error("Problem in connecting to PM");
137     }
138     catch (...)
139     {
140         cerr << "Could not connect to " << fServer  << endl;
141         throw runtime_error("Problem in connecting to PM");
142     }
143 
144 
145 }
146 //------------------------------------------------------------------------------
startClientThread()147 void WESplClient::startClientThread()
148 {
149     this->fpThread = new boost::thread(WESplClientRunner(*this));
150 }
151 //------------------------------------------------------------------------------
sendAndRecv()152 void WESplClient::sendAndRecv()
153 {
154     while (fContinue)
155     {
156         try
157         {
158             // Send messages if out queue has something
159             send();
160             // Recv messages if there is something in socket or timeout
161             recv();
162         }
163         catch (runtime_error&)
164         {
165             //setCpiFailed(true); - done in onDisconnect() BUG
166             setConnected(false);
167 
168             if (fOwner.getDebugLvl())
169                 cout << "Disconnect from PM - " << getPmId() << endl;
170 
171             onDisconnect();
172         }
173     }
174 
175     if (this->fCpiFailed)
176     {
177         // NOTE : commented out to avoid sending rollback twice.
178         //fOwner.onCpimportFail(this->getPmId());
179         char aDefCon[16], aRedCol[16];
180         snprintf(aDefCon, sizeof(aDefCon), "\033[0m");
181         snprintf(aRedCol, sizeof(aRedCol), "\033[0;31m");
182 
183         if (fOwner.getDebugLvl())
184             cout << aRedCol << "Bulk load FAILED on PM "
185                  << getPmId() << aDefCon << endl;
186     }
187     else if (this->fCpiPassed)
188     {
189         //if(fOwner.getDebugLvl())
190         //BUG 4195
191         char aDefCon[16], aGreenCol[16];
192         snprintf(aDefCon, sizeof(aDefCon), "\033[0m");
193         snprintf(aGreenCol, sizeof(aGreenCol), "\033[0;32m");
194 
195         if (fOwner.getDebugLvl())
196             cout << aGreenCol << "Bulk load Finished Successfully on PM "
197                  << getPmId() << aDefCon << endl;
198     }
199     else if (!this->fCpiStarted)
200     {
201         if (fOwner.getDebugLvl())
202             cout << "Cpimport Failed to Start!!!" << this->getPmId() << endl;
203     }
204 }
205 //------------------------------------------------------------------------------
send()206 void WESplClient::send()
207 {
208 
209     if ((!fSendQueue.empty()) && (getDataRqstCount() > 0))
210     {
211         if (fOwner.getDebugLvl() > 2)
212             cout << "DataRqstCnt [" << getPmId() << "] = "
213                  << getDataRqstCount() << endl;
214 
215         boost::mutex::scoped_lock aLock(fSentQMutex);
216         messageqcpp::SBS aSbs = fSendQueue.front();
217         fSendQueue.pop();
218         aLock.unlock();
219         int aLen = (*aSbs).length();
220 
221         if (aLen > 0)
222         {
223             boost::mutex::scoped_lock aLock(fWriteMutex);
224             setBytesTx(getBytesTx() + aLen);
225 
226             try
227             {
228                 if (isConnected())
229                     fClnt->write(aSbs);
230             }
231             catch (...)
232             {
233             }
234 
235             aLock.unlock();
236         }
237 
238         decDataRqstCount();
239         //decDbRootVar();
240     }
241 
242     //setSendFlag(fOwner.check4Ack(fPmId));
243 
244 }
245 //------------------------------------------------------------------------------
recv()246 void WESplClient::recv()
247 {
248     messageqcpp::SBS aSbs;
249     struct timespec rm_ts;
250     rm_ts.tv_sec = fRdSecTo;			//0 when data sending otherwise 1- second
251     rm_ts.tv_nsec = 20000000;			// 20 milliSec
252     bool isTimeOut = false;
253     int aLen = 0;
254 
255     try
256     {
257         if (isConnected())
258             aSbs = fClnt->read(&rm_ts, &isTimeOut);
259     }
260     catch (std::exception& ex)
261     {
262         setConnected(false);
263         cout << ex.what() << endl;
264         cout << "fClnt read error on " << getPmId() << endl;
265         throw runtime_error("fClnt read error");
266     }
267 
268     // - aSbs->length()>0 add to the sdh.fWesMsgQueue
269     try
270     {
271         if (aSbs)
272             aLen = aSbs->length();
273     }
274     catch (...)
275     {
276         aLen = 0;
277     }
278 
279     if (aLen > 0)
280     {
281         setLastInTime(time(0));  //added back for BUG 4535 / BUG 4195
282         setBytesRcv( getBytesRcv() + aLen);
283         fOwner.add2RespQueue(aSbs);
284     }
285     else if ((aLen <= 0) && (!isTimeOut))	//disconnect
286     {
287         cout << "Disconnect from PM - " << getPmId() << " IP " << endl;
288         onDisconnect();
289     }
290 
291 }
292 //------------------------------------------------------------------------------
write(const messageqcpp::ByteStream & Msg)293 void WESplClient::write(const messageqcpp::ByteStream& Msg)
294 {
295     setBytesTx(getBytesTx() + Msg.length());
296 
297     try
298     {
299         if (Msg.length() > 0)
300             fClnt->write(Msg);
301     }
302     catch (...)
303     {
304         //ignore it
305     }
306 }
307 //------------------------------------------------------------------------------
read(messageqcpp::SBS & Sbs)308 void WESplClient::read(messageqcpp::SBS& Sbs)
309 {
310     // read from the WEServerMsgQueue
311     // if Key is needed give that constant here
312 }
313 //------------------------------------------------------------------------------
314 //TODO - We may need to make it much more efficient by incorporating file read
add2SendQueue(const messageqcpp::SBS & Sbs)315 void WESplClient::add2SendQueue(const messageqcpp::SBS& Sbs)
316 {
317     this->fSendQueue.push(Sbs);
318 }
319 
320 
clearSendQueue()321 void WESplClient::clearSendQueue()
322 {
323     boost::mutex::scoped_lock aLock(fSentQMutex);
324 
325     while (!fSendQueue.empty())
326         fSendQueue.pop();
327 
328     aLock.unlock();
329 }
330 
getSendQSize()331 int WESplClient::getSendQSize()
332 {
333     int aQSize = 0;
334     boost::mutex::scoped_lock aLock(fSentQMutex);
335     aQSize = fSendQueue.size();
336     aLock.unlock();
337     return aQSize;
338 }
339 
340 
341 //------------------------------------------------------------------------------
printStats()342 void WESplClient::printStats()
343 {
344     if (fOwner.getDebugLvl())
345     {
346         cout << "\tPMid      \t" << getPmId() << endl;
347         cout << "\tTx Rows   \t" << getRowTx() << endl;
348         //if(fOwner.getDebugLvl())
349         cout << "\tTx Bytes  \t" << getBytesTx() << endl;
350         //if(fOwner.getDebugLvl())
351         cout << "\tRcv Bytes \t" << getBytesRcv() << endl;
352         cout << "\tInserted/Read Rows   " << fRowsUploadInfo.fRowsInserted << "/"
353              << fRowsUploadInfo.fRowsRead << endl;
354 
355         if (fColOorVec.size() > 0)
356             cout << "\tCol Id\tColName\t\t\tout-of-range count" << endl;
357 
358         WEColOorVec::iterator aIt = fColOorVec.begin();
359 
360         while (aIt != fColOorVec.end())
361         {
362             cout << "\t" << (*aIt).fColNum << "\t" << (*aIt).fColName << "\t\t" << (*aIt).fNoOfOORs << endl;
363             aIt++;
364         }
365 
366         if (!fBadDataFile.empty())cout << "\tBad Data Filename    " << fBadDataFile << endl;
367 
368         if (!fErrInfoFile.empty())cout << "\tError Filename       " << fErrInfoFile << endl;
369 
370         cout << "\t(" << getLastInTime() - getStartTime() << "sec)" << endl;
371         cout << "\t" << endl;
372     }
373 }
374 //------------------------------------------------------------------------------
375 
onConnect()376 void WESplClient::onConnect()
377 {
378     //TODO
379     // update all the flags on Connect.
380     // alert data can be send now
381     // do not allow to connect back again.
382 
383     // when reconnect happens, reset the variables
384     setRollbackRslt(0);
385     setCleanupRslt(0);
386     setCpiPassed(false);
387     setCpiFailed(false);
388 
389     setContinue(true);
390     setConnected(true);
391     ByteStream bsWrite;
392     bsWrite << (ByteStream::byte) WE_CLT_SRV_KEEPALIVE;
393 
394     try
395     {
396         this->write(bsWrite);				// send the keep init keep alive
397     }
398     catch (...)
399     {
400     }
401 
402     // need to send Alarm
403     fIpAddress = fClnt->addr2String();
404 
405 }
406 //------------------------------------------------------------------------------
onDisconnect()407 void WESplClient::onDisconnect()
408 {
409     //TODO
410     // - set fContinue to false  - set the thread free
411     setContinue(false);
412     setConnected(false);
413     setRollbackRslt(-1);
414     setCleanupRslt(-1);
415 
416     if ((!fCpiPassed) && (!fCpiFailed))	//a hard disconnection
417     {
418         fOwner.onCpimportFail(fPmId);
419         fOwner.setDisconnectFailure(true);
420     }
421 
422     // update all the flags of disconnect.
423     // alert on roll back
424     // do not allow to connect back again.
425 
426     try
427     {
428         // BT Should log this probably instead
429         // ALARMManager alarmMgr;
430         //std::string alarmItem = sin_addr2String(fClnt->serv_addr().sin_addr);
431         std::string alarmItem = fClnt->addr2String();
432         alarmItem.append(" WriteEngineServer");
433         //alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET);
434     }
435     catch (...)
436     {
437         // just ignore it for time being.
438     }
439 }
440 
441 //------------------------------------------------------------------------------
442 
setRowsUploadInfo(int64_t RowsRead,int64_t RowsInserted)443 void WESplClient::setRowsUploadInfo(int64_t RowsRead, int64_t RowsInserted)
444 {
445     fRowsUploadInfo.fRowsRead = RowsRead;
446     fRowsUploadInfo.fRowsInserted = RowsInserted;
447 }
448 
449 
450 //------------------------------------------------------------------------------
451 
add2ColOutOfRangeInfo(int ColNum,execplan::CalpontSystemCatalog::ColDataType ColType,std::string & ColName,int NoOfOors)452 void WESplClient::add2ColOutOfRangeInfo(int ColNum,
453   execplan::CalpontSystemCatalog::ColDataType ColType,
454                                         std::string&  ColName, int NoOfOors)
455 {
456     WEColOORInfo aColOorInfo;
457     aColOorInfo.fColNum = ColNum;
458     aColOorInfo.fColType = ColType;
459     aColOorInfo.fColName = ColName;
460     aColOorInfo.fNoOfOORs = NoOfOors;
461     fColOorVec.push_back(aColOorInfo);
462 }
463 
464 //------------------------------------------------------------------------------
465 
setBadDataFile(const std::string & BadDataFile)466 void WESplClient::setBadDataFile(const std::string& BadDataFile)
467 {
468     fBadDataFile = BadDataFile;
469 }
470 
471 //------------------------------------------------------------------------------
472 
473 
setErrInfoFile(const std::string & ErrInfoFile)474 void WESplClient::setErrInfoFile(const std::string& ErrInfoFile)
475 {
476     fErrInfoFile = ErrInfoFile;
477 }
478 
479 //------------------------------------------------------------------------------
480 
481 
482 } /* namespace WriteEngine */
483