1 /* Copyright (C) 2014 InfiniDB, Inc.
2 * Copyright (C) 2016 MariaDB Corporation.
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 /*******************************************************************************
20 * $Id$
21 *
22 *******************************************************************************/
23
24 /*
25 * we_splclient.cpp
26 *
27 * Created on: Oct 20, 2011
28 * Author: bpaul
29 */
30
31 #include <cstdio>
32 #include <iostream>
33 #include <string>
34 using namespace std;
35
36 #include "errorids.h"
37 #include "exceptclasses.h"
38 #include "messagelog.h"
39 #include "messageobj.h"
40 #include "loggingid.h"
41 using namespace logging;
42
43 #include <boost/thread/mutex.hpp>
44 using namespace boost;
45
46 #include "messagequeue.h"
47 #include "bytestream.h"
48 using namespace messageqcpp;
49
50 #include "liboamcpp.h"
51 using namespace oam;
52
53 #include "we_sdhandler.h"
54 #include "we_splclient.h"
55
56 namespace WriteEngine
57 {
58
59 //------------------------------------------------------------------------------
60 //BP 10/24/2011 14:25
61 //------------------------------------------------------------------------------
operator ()()62 void WESplClientRunner::operator()()
63 {
64 fOwner.sendAndRecv();
65 }
66
67 //------------------------------------------------------------------------------
68 //BP 10/24/2011 14:25
69 //------------------------------------------------------------------------------
70
71
WESplClient(WESDHandler & Sdh,int PmId)72 WESplClient::WESplClient(WESDHandler& Sdh, int PmId):
73 fContinue(true),
74 fConnected(false),
75 fPmId(PmId),
76 fDbrCnt(0),
77 fDbrVar(0),
78 fDataRqstCnt(0),
79 fRdSecTo(0),
80 fRowTx(0),
81 fBytesTx(0),
82 fBytesRcv(0),
83 fLastInTime(0),
84 fStartTime(time(0)),
85 fSend(true),
86 fCpiStarted(false),
87 fCpiPassed(false),
88 fCpiFailed(false),
89 fBrmRptRcvd(false),
90 fRollbackRslt(0),
91 fCleanupRslt(0),
92 fServer(),
93 fClnt(),
94 fpThread(0),
95 fOwner(Sdh)
96 {
97 // TODO ctor
98 }
99 //------------------------------------------------------------------------------
~WESplClient()100 WESplClient::~WESplClient()
101 {
102 delete fpThread;
103 fpThread = 0;
104 }
105 //------------------------------------------------------------------------------
setup()106 void WESplClient::setup()
107 {
108 // do the setup stuffs here
109 if (fOwner.getDebugLvl())
110 cout << "setting connection to moduleid " << getPmId() << endl;
111
112 char buff[32];
113 snprintf(buff, sizeof(buff), "pm%u_WriteEngineServer", getPmId());
114 fServer = buff;
115
116 fClnt.reset(new MessageQueueClient(fServer));
117
118 if (fOwner.getDebugLvl())
119 cout << "WEServer : " << fServer << " " << fClnt->addr2String() << endl;
120
121 try
122 {
123 if (fClnt->connect())
124 {
125 onConnect();
126 startClientThread();
127 }
128 else
129 {
130 throw runtime_error("Connection refused");
131 }
132 }
133 catch (std::exception& ex)
134 {
135 cerr << "Could not connect to " << fServer << ": " << ex.what() << endl;
136 throw runtime_error("Problem in connecting to PM");
137 }
138 catch (...)
139 {
140 cerr << "Could not connect to " << fServer << endl;
141 throw runtime_error("Problem in connecting to PM");
142 }
143
144
145 }
146 //------------------------------------------------------------------------------
startClientThread()147 void WESplClient::startClientThread()
148 {
149 this->fpThread = new boost::thread(WESplClientRunner(*this));
150 }
151 //------------------------------------------------------------------------------
sendAndRecv()152 void WESplClient::sendAndRecv()
153 {
154 while (fContinue)
155 {
156 try
157 {
158 // Send messages if out queue has something
159 send();
160 // Recv messages if there is something in socket or timeout
161 recv();
162 }
163 catch (runtime_error&)
164 {
165 //setCpiFailed(true); - done in onDisconnect() BUG
166 setConnected(false);
167
168 if (fOwner.getDebugLvl())
169 cout << "Disconnect from PM - " << getPmId() << endl;
170
171 onDisconnect();
172 }
173 }
174
175 if (this->fCpiFailed)
176 {
177 // NOTE : commented out to avoid sending rollback twice.
178 //fOwner.onCpimportFail(this->getPmId());
179 char aDefCon[16], aRedCol[16];
180 snprintf(aDefCon, sizeof(aDefCon), "\033[0m");
181 snprintf(aRedCol, sizeof(aRedCol), "\033[0;31m");
182
183 if (fOwner.getDebugLvl())
184 cout << aRedCol << "Bulk load FAILED on PM "
185 << getPmId() << aDefCon << endl;
186 }
187 else if (this->fCpiPassed)
188 {
189 //if(fOwner.getDebugLvl())
190 //BUG 4195
191 char aDefCon[16], aGreenCol[16];
192 snprintf(aDefCon, sizeof(aDefCon), "\033[0m");
193 snprintf(aGreenCol, sizeof(aGreenCol), "\033[0;32m");
194
195 if (fOwner.getDebugLvl())
196 cout << aGreenCol << "Bulk load Finished Successfully on PM "
197 << getPmId() << aDefCon << endl;
198 }
199 else if (!this->fCpiStarted)
200 {
201 if (fOwner.getDebugLvl())
202 cout << "Cpimport Failed to Start!!!" << this->getPmId() << endl;
203 }
204 }
205 //------------------------------------------------------------------------------
send()206 void WESplClient::send()
207 {
208
209 if ((!fSendQueue.empty()) && (getDataRqstCount() > 0))
210 {
211 if (fOwner.getDebugLvl() > 2)
212 cout << "DataRqstCnt [" << getPmId() << "] = "
213 << getDataRqstCount() << endl;
214
215 boost::mutex::scoped_lock aLock(fSentQMutex);
216 messageqcpp::SBS aSbs = fSendQueue.front();
217 fSendQueue.pop();
218 aLock.unlock();
219 int aLen = (*aSbs).length();
220
221 if (aLen > 0)
222 {
223 boost::mutex::scoped_lock aLock(fWriteMutex);
224 setBytesTx(getBytesTx() + aLen);
225
226 try
227 {
228 if (isConnected())
229 fClnt->write(aSbs);
230 }
231 catch (...)
232 {
233 }
234
235 aLock.unlock();
236 }
237
238 decDataRqstCount();
239 //decDbRootVar();
240 }
241
242 //setSendFlag(fOwner.check4Ack(fPmId));
243
244 }
245 //------------------------------------------------------------------------------
recv()246 void WESplClient::recv()
247 {
248 messageqcpp::SBS aSbs;
249 struct timespec rm_ts;
250 rm_ts.tv_sec = fRdSecTo; //0 when data sending otherwise 1- second
251 rm_ts.tv_nsec = 20000000; // 20 milliSec
252 bool isTimeOut = false;
253 int aLen = 0;
254
255 try
256 {
257 if (isConnected())
258 aSbs = fClnt->read(&rm_ts, &isTimeOut);
259 }
260 catch (std::exception& ex)
261 {
262 setConnected(false);
263 cout << ex.what() << endl;
264 cout << "fClnt read error on " << getPmId() << endl;
265 throw runtime_error("fClnt read error");
266 }
267
268 // - aSbs->length()>0 add to the sdh.fWesMsgQueue
269 try
270 {
271 if (aSbs)
272 aLen = aSbs->length();
273 }
274 catch (...)
275 {
276 aLen = 0;
277 }
278
279 if (aLen > 0)
280 {
281 setLastInTime(time(0)); //added back for BUG 4535 / BUG 4195
282 setBytesRcv( getBytesRcv() + aLen);
283 fOwner.add2RespQueue(aSbs);
284 }
285 else if ((aLen <= 0) && (!isTimeOut)) //disconnect
286 {
287 cout << "Disconnect from PM - " << getPmId() << " IP " << endl;
288 onDisconnect();
289 }
290
291 }
292 //------------------------------------------------------------------------------
write(const messageqcpp::ByteStream & Msg)293 void WESplClient::write(const messageqcpp::ByteStream& Msg)
294 {
295 setBytesTx(getBytesTx() + Msg.length());
296
297 try
298 {
299 if (Msg.length() > 0)
300 fClnt->write(Msg);
301 }
302 catch (...)
303 {
304 //ignore it
305 }
306 }
307 //------------------------------------------------------------------------------
read(messageqcpp::SBS & Sbs)308 void WESplClient::read(messageqcpp::SBS& Sbs)
309 {
310 // read from the WEServerMsgQueue
311 // if Key is needed give that constant here
312 }
313 //------------------------------------------------------------------------------
314 //TODO - We may need to make it much more efficient by incorporating file read
add2SendQueue(const messageqcpp::SBS & Sbs)315 void WESplClient::add2SendQueue(const messageqcpp::SBS& Sbs)
316 {
317 this->fSendQueue.push(Sbs);
318 }
319
320
clearSendQueue()321 void WESplClient::clearSendQueue()
322 {
323 boost::mutex::scoped_lock aLock(fSentQMutex);
324
325 while (!fSendQueue.empty())
326 fSendQueue.pop();
327
328 aLock.unlock();
329 }
330
getSendQSize()331 int WESplClient::getSendQSize()
332 {
333 int aQSize = 0;
334 boost::mutex::scoped_lock aLock(fSentQMutex);
335 aQSize = fSendQueue.size();
336 aLock.unlock();
337 return aQSize;
338 }
339
340
341 //------------------------------------------------------------------------------
printStats()342 void WESplClient::printStats()
343 {
344 if (fOwner.getDebugLvl())
345 {
346 cout << "\tPMid \t" << getPmId() << endl;
347 cout << "\tTx Rows \t" << getRowTx() << endl;
348 //if(fOwner.getDebugLvl())
349 cout << "\tTx Bytes \t" << getBytesTx() << endl;
350 //if(fOwner.getDebugLvl())
351 cout << "\tRcv Bytes \t" << getBytesRcv() << endl;
352 cout << "\tInserted/Read Rows " << fRowsUploadInfo.fRowsInserted << "/"
353 << fRowsUploadInfo.fRowsRead << endl;
354
355 if (fColOorVec.size() > 0)
356 cout << "\tCol Id\tColName\t\t\tout-of-range count" << endl;
357
358 WEColOorVec::iterator aIt = fColOorVec.begin();
359
360 while (aIt != fColOorVec.end())
361 {
362 cout << "\t" << (*aIt).fColNum << "\t" << (*aIt).fColName << "\t\t" << (*aIt).fNoOfOORs << endl;
363 aIt++;
364 }
365
366 if (!fBadDataFile.empty())cout << "\tBad Data Filename " << fBadDataFile << endl;
367
368 if (!fErrInfoFile.empty())cout << "\tError Filename " << fErrInfoFile << endl;
369
370 cout << "\t(" << getLastInTime() - getStartTime() << "sec)" << endl;
371 cout << "\t" << endl;
372 }
373 }
374 //------------------------------------------------------------------------------
375
onConnect()376 void WESplClient::onConnect()
377 {
378 //TODO
379 // update all the flags on Connect.
380 // alert data can be send now
381 // do not allow to connect back again.
382
383 // when reconnect happens, reset the variables
384 setRollbackRslt(0);
385 setCleanupRslt(0);
386 setCpiPassed(false);
387 setCpiFailed(false);
388
389 setContinue(true);
390 setConnected(true);
391 ByteStream bsWrite;
392 bsWrite << (ByteStream::byte) WE_CLT_SRV_KEEPALIVE;
393
394 try
395 {
396 this->write(bsWrite); // send the keep init keep alive
397 }
398 catch (...)
399 {
400 }
401
402 // need to send Alarm
403 fIpAddress = fClnt->addr2String();
404
405 }
406 //------------------------------------------------------------------------------
onDisconnect()407 void WESplClient::onDisconnect()
408 {
409 //TODO
410 // - set fContinue to false - set the thread free
411 setContinue(false);
412 setConnected(false);
413 setRollbackRslt(-1);
414 setCleanupRslt(-1);
415
416 if ((!fCpiPassed) && (!fCpiFailed)) //a hard disconnection
417 {
418 fOwner.onCpimportFail(fPmId);
419 fOwner.setDisconnectFailure(true);
420 }
421
422 // update all the flags of disconnect.
423 // alert on roll back
424 // do not allow to connect back again.
425
426 try
427 {
428 // BT Should log this probably instead
429 // ALARMManager alarmMgr;
430 //std::string alarmItem = sin_addr2String(fClnt->serv_addr().sin_addr);
431 std::string alarmItem = fClnt->addr2String();
432 alarmItem.append(" WriteEngineServer");
433 //alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET);
434 }
435 catch (...)
436 {
437 // just ignore it for time being.
438 }
439 }
440
441 //------------------------------------------------------------------------------
442
setRowsUploadInfo(int64_t RowsRead,int64_t RowsInserted)443 void WESplClient::setRowsUploadInfo(int64_t RowsRead, int64_t RowsInserted)
444 {
445 fRowsUploadInfo.fRowsRead = RowsRead;
446 fRowsUploadInfo.fRowsInserted = RowsInserted;
447 }
448
449
450 //------------------------------------------------------------------------------
451
add2ColOutOfRangeInfo(int ColNum,execplan::CalpontSystemCatalog::ColDataType ColType,std::string & ColName,int NoOfOors)452 void WESplClient::add2ColOutOfRangeInfo(int ColNum,
453 execplan::CalpontSystemCatalog::ColDataType ColType,
454 std::string& ColName, int NoOfOors)
455 {
456 WEColOORInfo aColOorInfo;
457 aColOorInfo.fColNum = ColNum;
458 aColOorInfo.fColType = ColType;
459 aColOorInfo.fColName = ColName;
460 aColOorInfo.fNoOfOORs = NoOfOors;
461 fColOorVec.push_back(aColOorInfo);
462 }
463
464 //------------------------------------------------------------------------------
465
setBadDataFile(const std::string & BadDataFile)466 void WESplClient::setBadDataFile(const std::string& BadDataFile)
467 {
468 fBadDataFile = BadDataFile;
469 }
470
471 //------------------------------------------------------------------------------
472
473
setErrInfoFile(const std::string & ErrInfoFile)474 void WESplClient::setErrInfoFile(const std::string& ErrInfoFile)
475 {
476 fErrInfoFile = ErrInfoFile;
477 }
478
479 //------------------------------------------------------------------------------
480
481
482 } /* namespace WriteEngine */
483