1 /* Copyright (C) 2014 InfiniDB, Inc.
2 * Copyright (C) 2016-2020 MariaDB Corporation.
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 //
20 // $Id: distributedenginecomm.cpp 9655 2013-06-25 23:08:13Z xlou $
21 //
22 // C++ Implementation: distributedenginecomm
23 //
24 // Description:
25 //
26 //
27 // Author: <pfigg@calpont.com>, (C) 2006
28 //
29 // Copyright: See COPYING file that comes with this distribution
30 //
31 //
32
33 #include <sstream>
34 #include <stdexcept>
35 #include <cassert>
36 #include <ctime>
37 #include <algorithm>
38 #include <unistd.h>
39 #include <chrono>
40 #include <thread>
41 using namespace std;
42
43 #include <boost/scoped_array.hpp>
44 #include <boost/thread.hpp>
45 using namespace boost;
46
47 #include "distributedenginecomm.h"
48
49 #include "messagequeue.h"
50 #include "bytestream.h"
51 using namespace messageqcpp;
52
53 #include "configcpp.h"
54 using namespace config;
55
56 #include "errorids.h"
57 #include "exceptclasses.h"
58 #include "messagelog.h"
59 #include "messageobj.h"
60 #include "loggingid.h"
61 using namespace logging;
62
63 #include "liboamcpp.h"
64 using namespace oam;
65
66 #include "jobstep.h"
67 using namespace joblist;
68
69 #include "atomicops.h"
70
71 namespace
72 {
73
writeToLog(const char * file,int line,const string & msg,LOG_TYPE logto=LOG_TYPE_INFO)74 void writeToLog(const char* file, int line, const string& msg, LOG_TYPE logto = LOG_TYPE_INFO)
75 {
76 LoggingID lid(05);
77 MessageLog ml(lid);
78 Message::Args args;
79 Message m(0);
80 args.add(file);
81 args.add("@");
82 args.add(line);
83 args.add(msg);
84 m.format(args);
85
86 switch (logto)
87 {
88 case LOG_TYPE_DEBUG:
89 ml.logDebugMessage(m);
90 break;
91
92 case LOG_TYPE_INFO:
93 ml.logInfoMessage(m);
94 break;
95
96 case LOG_TYPE_WARNING:
97 ml.logWarningMessage(m);
98 break;
99
100 case LOG_TYPE_ERROR:
101 ml.logWarningMessage(m);
102 break;
103
104 case LOG_TYPE_CRITICAL:
105 ml.logCriticalMessage(m);
106 break;
107 }
108 }
109
110 // @bug 1463. this function is added for PM failover. for dual/more nic PM,
111 // this function is used to get the module name
getModuleNameByIPAddr(oam::ModuleTypeConfig moduletypeconfig,string ipAddress)112 string getModuleNameByIPAddr(oam::ModuleTypeConfig moduletypeconfig,
113 string ipAddress)
114 {
115 string modulename = "";
116 DeviceNetworkList::iterator pt = moduletypeconfig.ModuleNetworkList.begin();
117
118 for ( ; pt != moduletypeconfig.ModuleNetworkList.end() ; pt++)
119 {
120 modulename = (*pt).DeviceName;
121 HostConfigList::iterator pt1 = (*pt).hostConfigList.begin();
122
123 for ( ; pt1 != (*pt).hostConfigList.end() ; pt1++)
124 {
125 if (ipAddress == (*pt1).IPAddr)
126 return modulename;
127 }
128 }
129
130 return modulename;
131 }
132
133 struct EngineCommRunner
134 {
EngineCommRunner__anonac90c1cf0111::EngineCommRunner135 EngineCommRunner(joblist::DistributedEngineComm* jl,
136 boost::shared_ptr<MessageQueueClient> cl, uint32_t connectionIndex) : jbl(jl), client(cl),
137 connIndex(connectionIndex) {}
138 joblist::DistributedEngineComm* jbl;
139 boost::shared_ptr<MessageQueueClient> client;
140 uint32_t connIndex;
operator ()__anonac90c1cf0111::EngineCommRunner141 void operator()()
142 {
143 //cout << "Listening on client at 0x" << hex << (ptrdiff_t)client << dec << endl;
144 try
145 {
146 jbl->Listen(client, connIndex);
147 }
148 catch (std::exception& ex)
149 {
150 string what(ex.what());
151 cerr << "exception caught in EngineCommRunner: " << what << endl;
152
153 if (what.find("St9bad_alloc") != string::npos)
154 {
155 writeToLog(__FILE__, __LINE__, what, LOG_TYPE_CRITICAL);
156 // abort();
157 }
158 else writeToLog(__FILE__, __LINE__, what);
159 }
160 catch (...)
161 {
162 string msg("exception caught in EngineCommRunner.");
163 writeToLog(__FILE__, __LINE__, msg);
164 cerr << msg << endl;
165 }
166 }
167 };
168
169 template <typename T>
170 struct QueueShutdown : public unary_function<T&, void>
171 {
operator ()__anonac90c1cf0111::QueueShutdown172 void operator()(T& x)
173 {
174 x.shutdown();
175 }
176 };
177
178 }
179
180 /** Debug macro */
181 #define THROTTLE_DEBUG 0
182 #if THROTTLE_DEBUG
183 #define THROTTLEDEBUG std::cout
184 #else
185 #define THROTTLEDEBUG if (false) std::cout
186 #endif
187
188 namespace joblist
189 {
190 DistributedEngineComm* DistributedEngineComm::fInstance = 0;
191
192 /*static*/
instance(ResourceManager * rm,bool isExeMgr)193 DistributedEngineComm* DistributedEngineComm::instance(ResourceManager* rm, bool isExeMgr)
194 {
195 if (fInstance == 0)
196 fInstance = new DistributedEngineComm(rm, isExeMgr);
197
198 return fInstance;
199 }
200
201 /*static*/
reset()202 void DistributedEngineComm::reset()
203 {
204 delete fInstance;
205 fInstance = 0;
206 }
207
DistributedEngineComm(ResourceManager * rm,bool isExeMgr)208 DistributedEngineComm::DistributedEngineComm(ResourceManager* rm, bool isExeMgr) :
209 fRm(rm),
210 fLBIDShift(fRm->getPsLBID_Shift()),
211 pmCount(0),
212 fIsExeMgr(isExeMgr)
213 {
214
215 Setup();
216 }
217
~DistributedEngineComm()218 DistributedEngineComm::~DistributedEngineComm()
219 {
220 Close();
221 fInstance = 0;
222 }
223
Setup()224 void DistributedEngineComm::Setup()
225 {
226 // This is here to ensure that this function does not get invoked multiple times simultaneously.
227 boost::mutex::scoped_lock setupLock(fSetupMutex);
228
229 makeBusy(true);
230
231 // This needs to be here to ensure that whenever Setup function is called, the lists are
232 // empty. It's possible junk was left behind if exception.
233 ClientList::iterator iter;
234
235 for (iter = newClients.begin(); iter != newClients.end(); ++iter)
236 {
237 (*iter)->shutdown();
238 }
239
240 newClients.clear();
241 newLocks.clear();
242
243 uint32_t newPmCount = fRm->getPsCount();
244 throttleThreshold = fRm->getDECThrottleThreshold();
245 tbpsThreadCount = fRm->getJlNumScanReceiveThreads();
246 unsigned numConnections = getNumConnections();
247 oam::Oam oam;
248 ModuleTypeConfig moduletypeconfig;
249
250 try
251 {
252 oam.getSystemConfig("pm", moduletypeconfig);
253 }
254 catch (...)
255 {
256 writeToLog(__FILE__, __LINE__, "oam.getSystemConfig error, unknown exception", LOG_TYPE_ERROR);
257 throw runtime_error("Setup failed");
258 }
259
260 if (newPmCount == 0)
261 writeToLog(__FILE__, __LINE__, "Got a config file with 0 PMs",
262 LOG_TYPE_CRITICAL);
263
264 //This needs to make sense when compared to the extent size
265 // fLBIDShift = static_cast<unsigned>(config::Config::uFromText(fConfig->getConfig(section, "LBID_Shift")));
266
267 auto* config = fRm->getConfig();
268 std::vector<messageqcpp::AddrAndPortPair> pmsAddressesAndPorts;
269 for (size_t i = 1; i <= newPmCount; ++i)
270 {
271 std::string pmConfigNodeName("PMS");
272 pmConfigNodeName.append(std::to_string(i));
273 // The port returned by getAddressAndPort can be 0 but this will be handled by
274 // MessageQueueClient::connect
275 pmsAddressesAndPorts.push_back(messageqcpp::getAddressAndPort(config,
276 pmConfigNodeName));
277 }
278
279 // numConnections must be calculated as number of PMs * number of connections per PM.
280 // This happens earlier in getNumConnections().
281 for (size_t i = 0; i < numConnections; ++i)
282 {
283 size_t connectionId = i % newPmCount;
284 boost::shared_ptr<MessageQueueClient> cl(new MessageQueueClient(pmsAddressesAndPorts[connectionId].first,
285 pmsAddressesAndPorts[connectionId].second));
286 boost::shared_ptr<boost::mutex> nl(new boost::mutex());
287
288 try
289 {
290 if (cl->connect())
291 {
292 newClients.push_back(cl);
293 // assign the module name
294 cl->moduleName(getModuleNameByIPAddr(moduletypeconfig, cl->addr2String()));
295 newLocks.push_back(nl);
296 StartClientListener(cl, i);
297 }
298 else
299 {
300 throw runtime_error("Connection refused from PMS" + std::to_string(connectionId));
301 }
302 }
303 catch (std::exception& ex)
304 {
305 if (i < newPmCount)
306 newPmCount--;
307
308 writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId) + ": " + ex.what(), LOG_TYPE_ERROR);
309 cerr << "Could not connect to PMS" << std::to_string(connectionId) << ": " << ex.what() << endl;
310 }
311 catch (...)
312 {
313 if (i < newPmCount)
314 newPmCount--;
315
316 writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId), LOG_TYPE_ERROR);
317 }
318 }
319
320 // for every entry in newClients up to newPmCount, scan for the same ip in the
321 // first pmCount. If there is no match, it's a new node,
322 // call the event listeners' newPMOnline() callbacks.
323 boost::mutex::scoped_lock lock(eventListenerLock);
324
325 for (uint32_t i = 0; i < newPmCount; i++)
326 {
327 uint32_t j;
328
329 for (j = 0; j < pmCount; j++)
330 {
331 if (newClients[i]->isSameAddr(*fPmConnections[j]))
332 break;
333 }
334
335 if (j == pmCount)
336 for (uint32_t k = 0; k < eventListeners.size(); k++)
337 eventListeners[k]->newPMOnline(i);
338 }
339
340 lock.unlock();
341
342 fWlock.swap(newLocks);
343 fPmConnections.swap(newClients);
344 // memory barrier to prevent the pmCount assignment migrating upward
345 atomicops::atomicMb();
346 pmCount = newPmCount;
347
348 newLocks.clear();
349 newClients.clear();
350 }
351
Close()352 int DistributedEngineComm::Close()
353 {
354 //cout << "DistributedEngineComm::Close() called" << endl;
355
356 makeBusy(false);
357 // for each MessageQueueClient in pmConnections delete the MessageQueueClient;
358 fPmConnections.clear();
359 fPmReader.clear();
360 return 0;
361 }
362
Listen(boost::shared_ptr<MessageQueueClient> client,uint32_t connIndex)363 void DistributedEngineComm::Listen(boost::shared_ptr<MessageQueueClient> client, uint32_t connIndex)
364 {
365 SBS sbs;
366
367 try
368 {
369 while (Busy())
370 {
371 Stats stats;
372 //TODO: This call blocks so setting Busy() in another thread doesn't work here...
373 sbs = client->read(0, NULL, &stats);
374
375 if (sbs->length() != 0)
376 {
377 addDataToOutput(sbs, connIndex, &stats);
378 }
379 else // got zero bytes on read, nothing more will come
380 goto Error;
381 }
382
383 return;
384 }
385 catch (std::exception& e)
386 {
387 cerr << "DEC Caught EXCEPTION: " << e.what() << endl;
388 goto Error;
389 }
390 catch (...)
391 {
392 cerr << "DEC Caught UNKNOWN EXCEPT" << endl;
393 goto Error;
394 }
395
396 Error:
397 // @bug 488 - error condition! push 0 length bs to messagequeuemap and
398 // eventually let jobstep error out.
399 boost::mutex::scoped_lock lk(fMlock);
400 MessageQueueMap::iterator map_tok;
401 sbs.reset(new ByteStream(0));
402
403 for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
404 {
405 map_tok->second->queue.clear();
406 (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
407 map_tok->second->queue.push(sbs);
408 }
409 lk.unlock();
410
411 if (fIsExeMgr)
412 {
413 //std::cout << "WARNING: DEC READ 0 LENGTH BS FROM "
414 // << client->otherEnd()<< " OR GOT AN EXCEPTION READING" << std::endl;
415 decltype(pmCount) originalPMCount = pmCount;
416 // Re-establish if a remote PM restarted.
417 std::this_thread::sleep_for(std::chrono::seconds(3));
418 Setup();
419 if (originalPMCount != pmCount)
420 {
421 ostringstream os;
422 os << "DEC: lost connection to " << client->addr2String();
423 writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_ERROR);
424 }
425
426 /*
427 // reset the pmconnection vector
428 ClientList tempConns;
429 boost::mutex::scoped_lock onErrLock(fOnErrMutex);
430 string moduleName = client->moduleName();
431 //cout << "moduleName=" << moduleName << endl;
432 for ( uint32_t i = 0; i < fPmConnections.size(); i++)
433 {
434 if (moduleName != fPmConnections[i]->moduleName())
435 tempConns.push_back(fPmConnections[i]);
436 //else
437 //cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" << fPmConnections[i]->moduleName() << endl;
438 }
439
440 if (tempConns.size() == fPmConnections.size()) return;
441
442 fPmConnections.swap(tempConns);
443 pmCount = (pmCount == 0 ? 0 : pmCount - 1);
444 //cout << "PMCOUNT=" << pmCount << endl;
445
446 // log it
447 ostringstream os;
448 os << "DEC: lost connection to " << client->addr2String();
449 writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL);
450 */
451 }
452 return;
453 }
454
addQueue(uint32_t key,bool sendACKs)455 void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
456 {
457 bool b;
458
459 boost::mutex* lock = new boost::mutex();
460 condition* cond = new condition();
461 boost::shared_ptr<MQE> mqe(new MQE(pmCount));
462
463 mqe->queue = StepMsgQueue(lock, cond);
464 mqe->sendACKs = sendACKs;
465 mqe->throttled = false;
466
467 boost::mutex::scoped_lock lk ( fMlock );
468 b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second;
469
470 if (!b)
471 {
472 ostringstream os;
473 os << "DEC: attempt to add a queue with a duplicate ID " << key << endl;
474 throw runtime_error(os.str());
475 }
476 }
477
removeQueue(uint32_t key)478 void DistributedEngineComm::removeQueue(uint32_t key)
479 {
480 boost::mutex::scoped_lock lk(fMlock);
481 MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
482
483 if (map_tok == fSessionMessages.end())
484 return;
485
486 map_tok->second->queue.shutdown();
487 map_tok->second->queue.clear();
488 fSessionMessages.erase(map_tok);
489 }
490
shutdownQueue(uint32_t key)491 void DistributedEngineComm::shutdownQueue(uint32_t key)
492 {
493 boost::mutex::scoped_lock lk(fMlock);
494 MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
495
496 if (map_tok == fSessionMessages.end())
497 return;
498
499 map_tok->second->queue.shutdown();
500 map_tok->second->queue.clear();
501 }
502
read(uint32_t key,SBS & bs)503 void DistributedEngineComm::read(uint32_t key, SBS& bs)
504 {
505 boost::shared_ptr<MQE> mqe;
506
507 //Find the StepMsgQueueList for this session
508 boost::mutex::scoped_lock lk(fMlock);
509 MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
510
511 if (map_tok == fSessionMessages.end())
512 {
513 ostringstream os;
514
515 os << "DEC: attempt to read(bs) from a nonexistent queue\n";
516 throw runtime_error(os.str());
517 }
518
519 mqe = map_tok->second;
520 lk.unlock();
521
522 //this method can block: you can't hold any locks here...
523 TSQSize_t queueSize = mqe->queue.pop(&bs);
524
525 if (bs && mqe->sendACKs)
526 {
527 boost::mutex::scoped_lock lk(ackLock);
528
529 if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
530 setFlowControl(false, key, mqe);
531
532 vector<SBS> v;
533 v.push_back(bs);
534 sendAcks(key, v, mqe, queueSize.size);
535 }
536
537 if (!bs)
538 bs.reset(new ByteStream());
539 }
540
read(uint32_t key)541 const ByteStream DistributedEngineComm::read(uint32_t key)
542 {
543 SBS sbs;
544 boost::shared_ptr<MQE> mqe;
545
546 //Find the StepMsgQueueList for this session
547 boost::mutex::scoped_lock lk(fMlock);
548 MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
549
550 if (map_tok == fSessionMessages.end())
551 {
552 ostringstream os;
553
554 os << "DEC: read(): attempt to read from a nonexistent queue\n";
555 throw runtime_error(os.str());
556 }
557
558 mqe = map_tok->second;
559 lk.unlock();
560
561 TSQSize_t queueSize = mqe->queue.pop(&sbs);
562
563 if (sbs && mqe->sendACKs)
564 {
565 boost::mutex::scoped_lock lk(ackLock);
566
567 if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
568 setFlowControl(false, key, mqe);
569
570 vector<SBS> v;
571 v.push_back(sbs);
572 sendAcks(key, v, mqe, queueSize.size);
573 }
574
575 if (!sbs)
576 sbs.reset(new ByteStream());
577
578 return *sbs;
579 }
580
read_all(uint32_t key,vector<SBS> & v)581 void DistributedEngineComm::read_all(uint32_t key, vector<SBS>& v)
582 {
583 boost::shared_ptr<MQE> mqe;
584
585 boost::mutex::scoped_lock lk(fMlock);
586 MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
587
588 if (map_tok == fSessionMessages.end())
589 {
590 ostringstream os;
591 os << "DEC: read_all(): attempt to read from a nonexistent queue\n";
592 throw runtime_error(os.str());
593 }
594
595 mqe = map_tok->second;
596 lk.unlock();
597
598 mqe->queue.pop_all(v);
599
600 if (mqe->sendACKs)
601 {
602 boost::mutex::scoped_lock lk(ackLock);
603 sendAcks(key, v, mqe, 0);
604 }
605 }
606
read_some(uint32_t key,uint32_t divisor,vector<SBS> & v,bool * flowControlOn)607 void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector<SBS>& v,
608 bool* flowControlOn)
609 {
610 boost::shared_ptr<MQE> mqe;
611
612 boost::mutex::scoped_lock lk(fMlock);
613 MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
614
615 if (map_tok == fSessionMessages.end())
616 {
617 ostringstream os;
618
619 os << "DEC: read_some(): attempt to read from a nonexistent queue\n";
620 throw runtime_error(os.str());
621 }
622
623 mqe = map_tok->second;
624 lk.unlock();
625
626 TSQSize_t queueSize = mqe->queue.pop_some(divisor, v, 1); // need to play with the min #
627
628 if (flowControlOn)
629 *flowControlOn = false;
630
631 if (mqe->sendACKs)
632 {
633 boost::mutex::scoped_lock lk(ackLock);
634
635 if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
636 setFlowControl(false, key, mqe);
637
638 sendAcks(key, v, mqe, queueSize.size);
639
640 if (flowControlOn)
641 *flowControlOn = mqe->throttled;
642 }
643 }
644
sendAcks(uint32_t uniqueID,const vector<SBS> & msgs,boost::shared_ptr<MQE> mqe,size_t queueSize)645 void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
646 boost::shared_ptr<MQE> mqe, size_t queueSize)
647 {
648 ISMPacketHeader* ism;
649 uint32_t l_msgCount = msgs.size();
650
651 /* If the current queue size > target, do nothing.
652 * If the original queue size > target, ACK the msgs below the target.
653 */
654 if (!mqe->throttled || queueSize >= mqe->targetQueueSize)
655 {
656 /* no acks will be sent, but update unackedwork to keep the #s accurate */
657 uint16_t numack = 0;
658 uint32_t sockidx = 0;
659
660 while (l_msgCount > 0)
661 {
662 nextPMToACK(mqe, l_msgCount, &sockidx, &numack);
663 idbassert(numack <= l_msgCount);
664 l_msgCount -= numack;
665 }
666
667 return;
668 }
669
670 size_t totalMsgSize = 0;
671
672 for (uint32_t i = 0; i < msgs.size(); i++)
673 totalMsgSize += msgs[i]->lengthWithHdrOverhead();
674
675 if (queueSize + totalMsgSize > mqe->targetQueueSize)
676 {
677 /* update unackedwork for the overage that will never be acked */
678 int64_t overage = queueSize + totalMsgSize - mqe->targetQueueSize;
679 uint16_t numack = 0;
680 uint32_t sockidx = 0;
681 uint32_t msgsToIgnore;
682
683 for (msgsToIgnore = 0; overage >= 0; msgsToIgnore++)
684 overage -= msgs[msgsToIgnore]->lengthWithHdrOverhead();
685
686 if (overage < 0)
687 msgsToIgnore--;
688
689 l_msgCount = msgs.size() - msgsToIgnore; // this num gets acked
690
691 while (msgsToIgnore > 0)
692 {
693 nextPMToACK(mqe, msgsToIgnore, &sockidx, &numack);
694 idbassert(numack <= msgsToIgnore);
695 msgsToIgnore -= numack;
696 }
697 }
698
699 if (l_msgCount > 0)
700 {
701 ByteStream msg(sizeof(ISMPacketHeader));
702 uint16_t* toAck;
703 vector<bool> pmAcked(pmCount, false);
704
705 ism = (ISMPacketHeader*) msg.getInputPtr();
706 // The only var checked by ReadThread is the Command var. The others
707 // are wasted space. We hijack the Size, & Flags fields for the
708 // params to the ACK msg.
709
710 ism->Interleave = uniqueID;
711 ism->Command = BATCH_PRIMITIVE_ACK;
712 toAck = &ism->Size;
713
714 msg.advanceInputPtr(sizeof(ISMPacketHeader));
715
716 while (l_msgCount > 0)
717 {
718 /* could have to send up to pmCount ACKs */
719 uint32_t sockIndex = 0;
720
721 /* This will reset the ACK field in the Bytestream directly, and nothing
722 * else needs to change if multiple msgs are sent. */
723 nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
724 idbassert(*toAck <= l_msgCount);
725 l_msgCount -= *toAck;
726 pmAcked[sockIndex] = true;
727 writeToClient(sockIndex, msg);
728 }
729
730 // @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked.
731 // This is apply to the big message case only. For small messages, the flow control is
732 // disabled when the queue size is below the disableThreshold.
733 if (mqe->hasBigMsgs)
734 {
735 uint64_t totalUnackedWork = 0;
736
737 for (uint32_t i = 0; i < pmCount; i++)
738 totalUnackedWork += mqe->unackedWork[i];
739
740 if (totalUnackedWork == 0)
741 {
742 *toAck = 1;
743
744 for (uint32_t i = 0; i < pmCount; i++)
745 {
746 if (!pmAcked[i])
747 writeToClient(i, msg);
748 }
749 }
750 }
751 }
752 }
753
nextPMToACK(boost::shared_ptr<MQE> mqe,uint32_t maxAck,uint32_t * sockIndex,uint16_t * numToAck)754 void DistributedEngineComm::nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t maxAck,
755 uint32_t* sockIndex, uint16_t* numToAck)
756 {
757 uint32_t i;
758 uint32_t& nextIndex = mqe->ackSocketIndex;
759
760 /* Other threads can be touching mqe->unackedWork at the same time, but because of
761 * the locking env, mqe->unackedWork can only grow; whatever gets latched in this fcn
762 * is a safe minimum at the point of use. */
763
764 if (mqe->unackedWork[nextIndex] >= maxAck)
765 {
766 (void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], maxAck);
767 *sockIndex = nextIndex;
768 //FIXME: we're going to truncate here from 32 to 16 bits. Hopefully this will always fit...
769 *numToAck = maxAck;
770
771 if (pmCount > 0)
772 nextIndex = (nextIndex + 1) % pmCount;
773
774 return;
775 }
776 else
777 {
778 for (i = 0; i < pmCount; i++)
779 {
780 uint32_t curVal = mqe->unackedWork[nextIndex];
781 uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal);
782
783 if (unackedWork > 0)
784 {
785 (void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], unackedWork);
786 *sockIndex = nextIndex;
787 *numToAck = unackedWork;
788
789 if (pmCount > 0)
790 nextIndex = (nextIndex + 1) % pmCount;
791
792 return;
793 }
794
795 if (pmCount > 0)
796 nextIndex = (nextIndex + 1) % pmCount;
797 }
798
799 cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! ";
800
801 for (i = 0; i < pmCount; i++)
802 cerr << mqe->unackedWork[i] << " ";
803
804 cerr << " max: " << maxAck;
805 cerr << endl;
806 //make sure the returned vars are legitimate
807 *sockIndex = nextIndex;
808 *numToAck = maxAck / pmCount;
809
810 if (pmCount > 0)
811 nextIndex = (nextIndex + 1) % pmCount;
812
813 return;
814 }
815 }
816
setFlowControl(bool enabled,uint32_t uniqueID,boost::shared_ptr<MQE> mqe)817 void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boost::shared_ptr<MQE> mqe)
818 {
819 mqe->throttled = enabled;
820 ByteStream msg(sizeof(ISMPacketHeader));
821 ISMPacketHeader* ism = (ISMPacketHeader*) msg.getInputPtr();
822
823 ism->Interleave = uniqueID;
824 ism->Command = BATCH_PRIMITIVE_ACK;
825 ism->Size = (enabled ? 0 : -1);
826
827 #ifdef VALGRIND
828 /* XXXPAT: For testing in valgrind, init the vars that don't get used */
829 ism->Flags = 0;
830 ism->Type = 0;
831 ism->MsgCount = 0;
832 ism->Status = 0;
833 #endif
834
835 msg.advanceInputPtr(sizeof(ISMPacketHeader));
836
837 for (uint32_t i = 0; i < mqe->pmCount; i++)
838 writeToClient(i, msg);
839 }
840
write(uint32_t senderID,ByteStream & msg)841 void DistributedEngineComm::write(uint32_t senderID, ByteStream& msg)
842 {
843 ISMPacketHeader* ism = (ISMPacketHeader*) msg.buf();
844 uint32_t dest;
845 uint32_t numConn = fPmConnections.size();
846
847 if (numConn > 0)
848 {
849 switch (ism->Command)
850 {
851 case BATCH_PRIMITIVE_CREATE:
852 /* Disable flow control initially */
853 msg << (uint32_t) - 1;
854 /* FALLTHRU */
855
856 case BATCH_PRIMITIVE_DESTROY:
857 case BATCH_PRIMITIVE_ADD_JOINER:
858 case BATCH_PRIMITIVE_END_JOINER:
859 case BATCH_PRIMITIVE_ABORT:
860 case DICT_CREATE_EQUALITY_FILTER:
861 case DICT_DESTROY_EQUALITY_FILTER:
862 /* XXXPAT: This relies on the assumption that the first pmCount "PMS*"
863 entries in the config file point to unique PMs */
864 uint32_t i;
865
866 for (i = 0; i < pmCount; i++)
867 writeToClient(i, msg, senderID);
868
869 return;
870
871 case BATCH_PRIMITIVE_RUN:
872 case DICT_TOKEN_BY_SCAN_COMPARE:
873 // for efficiency, writeToClient() grabs the interleaving factor for the caller,
874 // and decides the final connection index because it already grabs the
875 // caller's queue information
876 dest = ism->Interleave;
877 writeToClient(dest, msg, senderID, true);
878 break;
879
880 default:
881 idbassert_s(0, "Unknown message type");
882 }
883 }
884 else
885 {
886 writeToLog(__FILE__, __LINE__, "No PrimProcs are running", LOG_TYPE_DEBUG);
887 throw IDBExcept(ERR_NO_PRIMPROC);
888 }
889 }
890
write(messageqcpp::ByteStream & msg,uint32_t connection)891 void DistributedEngineComm::write(messageqcpp::ByteStream& msg, uint32_t connection)
892 {
893 ISMPacketHeader* ism = (ISMPacketHeader*) msg.buf();
894 PrimitiveHeader* pm = (PrimitiveHeader*) (ism + 1);
895 uint32_t senderID = pm->UniqueID;
896
897 boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t());
898 MessageQueueMap::iterator it;
899 // This keeps mqe's stats from being freed until end of function
900 boost::shared_ptr<MQE> mqe;
901 Stats* senderStats = NULL;
902
903 lk.lock();
904 it = fSessionMessages.find(senderID);
905
906 if (it != fSessionMessages.end())
907 {
908 mqe = it->second;
909 senderStats = &(mqe->stats);
910 }
911
912 lk.unlock();
913
914 newClients[connection]->write(msg, NULL, senderStats);
915 }
916
StartClientListener(boost::shared_ptr<MessageQueueClient> cl,uint32_t connIndex)917 void DistributedEngineComm::StartClientListener(boost::shared_ptr<MessageQueueClient> cl, uint32_t connIndex)
918 {
919 boost::thread* thrd = new boost::thread(EngineCommRunner(this, cl, connIndex));
920 fPmReader.push_back(thrd);
921 }
922
addDataToOutput(SBS sbs,uint32_t connIndex,Stats * stats)923 void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* stats)
924 {
925 ISMPacketHeader* hdr = (ISMPacketHeader*)(sbs->buf());
926 PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1);
927 uint32_t uniqueId = p->UniqueID;
928 boost::shared_ptr<MQE> mqe;
929
930 boost::mutex::scoped_lock lk(fMlock);
931 MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
932
933 if (map_tok == fSessionMessages.end())
934 {
935 // For debugging...
936 //cerr << "DistributedEngineComm::AddDataToOutput: tried to add a message to a dead session: " << uniqueId << ", size " << sbs->length() << ", step id " << p->StepID << endl;
937 return;
938 }
939
940 mqe = map_tok->second;
941 lk.unlock();
942
943 if (pmCount > 0)
944 {
945 (void)atomicops::atomicInc(&mqe->unackedWork[connIndex % pmCount]);
946 }
947
948 TSQSize_t queueSize = mqe->queue.push(sbs);
949
950 if (mqe->sendACKs)
951 {
952 boost::mutex::scoped_lock lk(ackLock);
953 uint64_t msgSize = sbs->lengthWithHdrOverhead();
954
955 if (!mqe->throttled && msgSize > (targetRecvQueueSize / 2))
956 doHasBigMsgs(mqe, (300 * 1024 * 1024 > 3 * msgSize ?
957 300 * 1024 * 1024 : 3 * msgSize)); //buffer at least 3 big msgs
958
959 if (!mqe->throttled && queueSize.size >= mqe->targetQueueSize)
960 setFlowControl(true, uniqueId, mqe);
961 }
962
963 if (stats)
964 mqe->stats.dataRecvd(stats->dataRecvd());
965 }
966
doHasBigMsgs(boost::shared_ptr<MQE> mqe,uint64_t targetSize)967 void DistributedEngineComm::doHasBigMsgs(boost::shared_ptr<MQE> mqe, uint64_t targetSize)
968 {
969 mqe->hasBigMsgs = true;
970
971 if (mqe->targetQueueSize < targetSize)
972 mqe->targetQueueSize = targetSize;
973 }
974
writeToClient(size_t index,const ByteStream & bs,uint32_t sender,bool doInterleaving)975 int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uint32_t sender, bool doInterleaving)
976 {
977 boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t());
978 MessageQueueMap::iterator it;
979 // Keep mqe's stats from being freed early
980 boost::shared_ptr<MQE> mqe;
981 Stats* senderStats = NULL;
982 uint32_t interleaver = 0;
983
984 if (fPmConnections.size() == 0)
985 return 0;
986
987 if (sender != numeric_limits<uint32_t>::max())
988 {
989 lk.lock();
990 it = fSessionMessages.find(sender);
991
992 if (it != fSessionMessages.end())
993 {
994 mqe = it->second;
995 senderStats = &(mqe->stats);
996
997 if (doInterleaving)
998 interleaver = it->second->interleaver[index % it->second->pmCount]++;
999 }
1000
1001 lk.unlock();
1002 }
1003
1004 try
1005 {
1006 if (doInterleaving)
1007 index = (index + (interleaver * pmCount)) % fPmConnections.size();
1008
1009 ClientList::value_type client = fPmConnections[index];
1010
1011 if (!client->isAvailable()) return 0;
1012
1013 boost::mutex::scoped_lock lk(*(fWlock[index]));
1014 client->write(bs, NULL, senderStats);
1015 return 0;
1016 }
1017 catch (...)
1018 {
1019 // @bug 488. error out under such condition instead of re-trying other connection,
1020 // by pushing 0 size bytestream to messagequeue and throw exception
1021 SBS sbs;
1022 lk.lock();
1023 //std::cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << std::endl;
1024 MessageQueueMap::iterator map_tok;
1025 sbs.reset(new ByteStream(0));
1026
1027 for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
1028 {
1029 map_tok->second->queue.clear();
1030 (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
1031 map_tok->second->queue.push(sbs);
1032 }
1033
1034 lk.unlock();
1035 /*
1036 // reconfig the connection array
1037 ClientList tempConns;
1038 {
1039 //cout << "WARNING: DEC WRITE BROKEN PIPE " << fPmConnections[index]->otherEnd()<< endl;
1040 boost::mutex::scoped_lock onErrLock(fOnErrMutex);
1041 string moduleName = fPmConnections[index]->moduleName();
1042 //cout << "module name = " << moduleName << endl;
1043 if (index >= fPmConnections.size()) return 0;
1044
1045 for (uint32_t i = 0; i < fPmConnections.size(); i++)
1046 {
1047 if (moduleName != fPmConnections[i]->moduleName())
1048 tempConns.push_back(fPmConnections[i]);
1049 }
1050 if (tempConns.size() == fPmConnections.size()) return 0;
1051 fPmConnections.swap(tempConns);
1052 pmCount = (pmCount == 0 ? 0 : pmCount - 1);
1053 }
1054 // send alarm
1055 ALARMManager alarmMgr;
1056 string alarmItem("UNKNOWN");
1057
1058 if (index < fPmConnections.size())
1059 {
1060 alarmItem = fPmConnections[index]->addr2String();
1061 }
1062
1063 alarmItem.append(" PrimProc");
1064 alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET);
1065 */
1066 throw runtime_error("DistributedEngineComm::write: Broken Pipe error");
1067 }
1068 }
1069
size(uint32_t key)1070 uint32_t DistributedEngineComm::size(uint32_t key)
1071 {
1072 boost::mutex::scoped_lock lk(fMlock);
1073 MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
1074
1075 if (map_tok == fSessionMessages.end())
1076 throw runtime_error("DEC::size() attempt to get the size of a nonexistant queue!");
1077
1078 boost::shared_ptr<MQE> mqe = map_tok->second;
1079 //TODO: should probably check that this is a valid iter...
1080 lk.unlock();
1081 return mqe->queue.size().count;
1082 }
1083
addDECEventListener(DECEventListener * l)1084 void DistributedEngineComm::addDECEventListener(DECEventListener* l)
1085 {
1086 boost::mutex::scoped_lock lk(eventListenerLock);
1087 eventListeners.push_back(l);
1088 }
1089
removeDECEventListener(DECEventListener * l)1090 void DistributedEngineComm::removeDECEventListener(DECEventListener* l)
1091 {
1092 boost::mutex::scoped_lock lk(eventListenerLock);
1093 std::vector<DECEventListener*> newListeners;
1094 uint32_t s = eventListeners.size();
1095
1096 for (uint32_t i = 0; i < s; i++)
1097 if (eventListeners[i] != l)
1098 newListeners.push_back(eventListeners[i]);
1099
1100 eventListeners.swap(newListeners);
1101 }
1102
getNetworkStats(uint32_t uniqueID)1103 Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID)
1104 {
1105 boost::mutex::scoped_lock lk(fMlock);
1106 MessageQueueMap::iterator it;
1107 Stats empty;
1108
1109 it = fSessionMessages.find(uniqueID);
1110
1111 if (it != fSessionMessages.end())
1112 return it->second->stats;
1113
1114 return empty;
1115 }
1116
MQE(uint32_t pCount)1117 DistributedEngineComm::MQE::MQE(uint32_t pCount) : ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false),
1118 targetQueueSize(targetRecvQueueSize)
1119 {
1120 unackedWork.reset(new volatile uint32_t[pmCount]);
1121 interleaver.reset(new uint32_t[pmCount]);
1122 memset((void*) unackedWork.get(), 0, pmCount * sizeof(uint32_t));
1123 memset((void*) interleaver.get(), 0, pmCount * sizeof(uint32_t));
1124 }
1125
1126 }
1127 // vim:ts=4 sw=4:
1128