1 /* Copyright (C) 2014 InfiniDB, Inc.
2  * Copyright (C) 2016-2020 MariaDB Corporation.
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //
20 // $Id: distributedenginecomm.cpp 9655 2013-06-25 23:08:13Z xlou $
21 //
22 // C++ Implementation: distributedenginecomm
23 //
24 // Description:
25 //
26 //
27 // Author:  <pfigg@calpont.com>, (C) 2006
28 //
29 // Copyright: See COPYING file that comes with this distribution
30 //
31 //
32 
33 #include <sstream>
34 #include <stdexcept>
35 #include <cassert>
36 #include <ctime>
37 #include <algorithm>
38 #include <unistd.h>
39 #include <chrono>
40 #include <thread>
41 using namespace std;
42 
43 #include <boost/scoped_array.hpp>
44 #include <boost/thread.hpp>
45 using namespace boost;
46 
47 #include "distributedenginecomm.h"
48 
49 #include "messagequeue.h"
50 #include "bytestream.h"
51 using namespace messageqcpp;
52 
53 #include "configcpp.h"
54 using namespace config;
55 
56 #include "errorids.h"
57 #include "exceptclasses.h"
58 #include "messagelog.h"
59 #include "messageobj.h"
60 #include "loggingid.h"
61 using namespace logging;
62 
63 #include "liboamcpp.h"
64 using namespace oam;
65 
66 #include "jobstep.h"
67 using namespace joblist;
68 
69 #include "atomicops.h"
70 
71 namespace
72 {
73 
writeToLog(const char * file,int line,const string & msg,LOG_TYPE logto=LOG_TYPE_INFO)74 void  writeToLog(const char* file, int line, const string& msg, LOG_TYPE logto = LOG_TYPE_INFO)
75 {
76     LoggingID lid(05);
77     MessageLog ml(lid);
78     Message::Args args;
79     Message m(0);
80     args.add(file);
81     args.add("@");
82     args.add(line);
83     args.add(msg);
84     m.format(args);
85 
86     switch (logto)
87     {
88         case LOG_TYPE_DEBUG:
89             ml.logDebugMessage(m);
90             break;
91 
92         case LOG_TYPE_INFO:
93             ml.logInfoMessage(m);
94             break;
95 
96         case LOG_TYPE_WARNING:
97             ml.logWarningMessage(m);
98             break;
99 
100         case LOG_TYPE_ERROR:
101             ml.logWarningMessage(m);
102             break;
103 
104         case LOG_TYPE_CRITICAL:
105             ml.logCriticalMessage(m);
106             break;
107     }
108 }
109 
110 // @bug 1463. this function is added for PM failover. for dual/more nic PM,
111 // this function is used to get the module name
getModuleNameByIPAddr(oam::ModuleTypeConfig moduletypeconfig,string ipAddress)112 string getModuleNameByIPAddr(oam::ModuleTypeConfig moduletypeconfig,
113                              string ipAddress)
114 {
115     string modulename = "";
116     DeviceNetworkList::iterator pt = moduletypeconfig.ModuleNetworkList.begin();
117 
118     for ( ; pt != moduletypeconfig.ModuleNetworkList.end() ; pt++)
119     {
120         modulename = (*pt).DeviceName;
121         HostConfigList::iterator pt1 = (*pt).hostConfigList.begin();
122 
123         for ( ; pt1 != (*pt).hostConfigList.end() ; pt1++)
124         {
125             if (ipAddress == (*pt1).IPAddr)
126                 return modulename;
127         }
128     }
129 
130     return modulename;
131 }
132 
133 struct EngineCommRunner
134 {
EngineCommRunner__anonac90c1cf0111::EngineCommRunner135     EngineCommRunner(joblist::DistributedEngineComm* jl,
136                      boost::shared_ptr<MessageQueueClient> cl, uint32_t connectionIndex) : jbl(jl), client(cl),
137         connIndex(connectionIndex) {}
138     joblist::DistributedEngineComm* jbl;
139     boost::shared_ptr<MessageQueueClient> client;
140     uint32_t connIndex;
operator ()__anonac90c1cf0111::EngineCommRunner141     void operator()()
142     {
143         //cout << "Listening on client at 0x" << hex << (ptrdiff_t)client << dec << endl;
144         try
145         {
146             jbl->Listen(client, connIndex);
147         }
148         catch (std::exception& ex)
149         {
150             string what(ex.what());
151             cerr << "exception caught in EngineCommRunner: " << what << endl;
152 
153             if (what.find("St9bad_alloc") != string::npos)
154             {
155                 writeToLog(__FILE__, __LINE__, what, LOG_TYPE_CRITICAL);
156 //           abort();
157             }
158             else  writeToLog(__FILE__, __LINE__, what);
159         }
160         catch (...)
161         {
162             string msg("exception caught in EngineCommRunner.");
163             writeToLog(__FILE__, __LINE__, msg);
164             cerr << msg << endl;
165         }
166     }
167 };
168 
169 template <typename T>
170 struct QueueShutdown : public unary_function<T&, void>
171 {
operator ()__anonac90c1cf0111::QueueShutdown172     void operator()(T& x)
173     {
174         x.shutdown();
175     }
176 };
177 
178 }
179 
180 /** Debug macro */
181 #define THROTTLE_DEBUG 0
182 #if THROTTLE_DEBUG
183 #define THROTTLEDEBUG std::cout
184 #else
185 #define THROTTLEDEBUG if (false) std::cout
186 #endif
187 
188 namespace joblist
189 {
190 DistributedEngineComm* DistributedEngineComm::fInstance = 0;
191 
192 /*static*/
instance(ResourceManager * rm,bool isExeMgr)193 DistributedEngineComm* DistributedEngineComm::instance(ResourceManager* rm, bool isExeMgr)
194 {
195     if (fInstance == 0)
196         fInstance = new DistributedEngineComm(rm, isExeMgr);
197 
198     return fInstance;
199 }
200 
201 /*static*/
reset()202 void DistributedEngineComm::reset()
203 {
204     delete fInstance;
205     fInstance = 0;
206 }
207 
DistributedEngineComm(ResourceManager * rm,bool isExeMgr)208 DistributedEngineComm::DistributedEngineComm(ResourceManager* rm, bool isExeMgr) :
209     fRm(rm),
210     fLBIDShift(fRm->getPsLBID_Shift()),
211     pmCount(0),
212     fIsExeMgr(isExeMgr)
213 {
214 
215     Setup();
216 }
217 
~DistributedEngineComm()218 DistributedEngineComm::~DistributedEngineComm()
219 {
220     Close();
221     fInstance = 0;
222 }
223 
Setup()224 void DistributedEngineComm::Setup()
225 {
226     // This is here to ensure that this function does not get invoked multiple times simultaneously.
227     boost::mutex::scoped_lock setupLock(fSetupMutex);
228 
229     makeBusy(true);
230 
231     // This needs to be here to ensure that whenever Setup function is called, the lists are
232     // empty. It's possible junk was left behind if exception.
233     ClientList::iterator iter;
234 
235     for (iter = newClients.begin(); iter != newClients.end(); ++iter)
236     {
237         (*iter)->shutdown();
238     }
239 
240     newClients.clear();
241     newLocks.clear();
242 
243     uint32_t newPmCount = fRm->getPsCount();
244     throttleThreshold = fRm->getDECThrottleThreshold();
245     tbpsThreadCount = fRm->getJlNumScanReceiveThreads();
246     unsigned numConnections = getNumConnections();
247     oam::Oam oam;
248     ModuleTypeConfig moduletypeconfig;
249 
250     try
251     {
252         oam.getSystemConfig("pm", moduletypeconfig);
253     }
254     catch (...)
255     {
256         writeToLog(__FILE__, __LINE__, "oam.getSystemConfig error, unknown exception", LOG_TYPE_ERROR);
257         throw runtime_error("Setup failed");
258     }
259 
260     if (newPmCount == 0)
261         writeToLog(__FILE__, __LINE__, "Got a config file with 0 PMs",
262                    LOG_TYPE_CRITICAL);
263 
264     //This needs to make sense when compared to the extent size
265     //     fLBIDShift = static_cast<unsigned>(config::Config::uFromText(fConfig->getConfig(section, "LBID_Shift")));
266 
267     auto* config = fRm->getConfig();
268     std::vector<messageqcpp::AddrAndPortPair> pmsAddressesAndPorts;
269     for (size_t i = 1; i <= newPmCount; ++i)
270     {
271         std::string pmConfigNodeName("PMS");
272         pmConfigNodeName.append(std::to_string(i));
273         // The port returned by getAddressAndPort can be 0 but this will be handled by
274         // MessageQueueClient::connect
275         pmsAddressesAndPorts.push_back(messageqcpp::getAddressAndPort(config,
276                                                                       pmConfigNodeName));
277     }
278 
279     // numConnections must be calculated as number of PMs * number of connections per PM.
280     // This happens earlier in getNumConnections().
281     for (size_t i = 0; i < numConnections; ++i)
282     {
283         size_t connectionId = i % newPmCount;
284         boost::shared_ptr<MessageQueueClient> cl(new MessageQueueClient(pmsAddressesAndPorts[connectionId].first,
285                                                                         pmsAddressesAndPorts[connectionId].second));
286         boost::shared_ptr<boost::mutex> nl(new boost::mutex());
287 
288         try
289         {
290             if (cl->connect())
291             {
292                 newClients.push_back(cl);
293                 // assign the module name
294                 cl->moduleName(getModuleNameByIPAddr(moduletypeconfig, cl->addr2String()));
295                 newLocks.push_back(nl);
296                 StartClientListener(cl, i);
297             }
298             else
299             {
300                 throw runtime_error("Connection refused from PMS" + std::to_string(connectionId));
301             }
302         }
303         catch (std::exception& ex)
304         {
305             if (i < newPmCount)
306                 newPmCount--;
307 
308             writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId) + ": " + ex.what(), LOG_TYPE_ERROR);
309             cerr << "Could not connect to PMS" << std::to_string(connectionId) << ": " << ex.what() << endl;
310         }
311         catch (...)
312         {
313             if (i < newPmCount)
314                 newPmCount--;
315 
316             writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId), LOG_TYPE_ERROR);
317         }
318     }
319 
320     // for every entry in newClients up to newPmCount, scan for the same ip in the
321     // first pmCount.  If there is no match, it's a new node,
322     //    call the event listeners' newPMOnline() callbacks.
323     boost::mutex::scoped_lock lock(eventListenerLock);
324 
325     for (uint32_t i = 0; i < newPmCount; i++)
326     {
327         uint32_t j;
328 
329         for (j = 0; j < pmCount; j++)
330         {
331             if (newClients[i]->isSameAddr(*fPmConnections[j]))
332                 break;
333         }
334 
335         if (j == pmCount)
336             for (uint32_t k = 0; k < eventListeners.size(); k++)
337                 eventListeners[k]->newPMOnline(i);
338     }
339 
340     lock.unlock();
341 
342     fWlock.swap(newLocks);
343     fPmConnections.swap(newClients);
344     // memory barrier to prevent the pmCount assignment migrating upward
345     atomicops::atomicMb();
346     pmCount = newPmCount;
347 
348     newLocks.clear();
349     newClients.clear();
350 }
351 
Close()352 int DistributedEngineComm::Close()
353 {
354     //cout << "DistributedEngineComm::Close() called" << endl;
355 
356     makeBusy(false);
357     // for each MessageQueueClient in pmConnections delete the MessageQueueClient;
358     fPmConnections.clear();
359     fPmReader.clear();
360     return 0;
361 }
362 
Listen(boost::shared_ptr<MessageQueueClient> client,uint32_t connIndex)363 void DistributedEngineComm::Listen(boost::shared_ptr<MessageQueueClient> client, uint32_t connIndex)
364 {
365     SBS sbs;
366 
367     try
368     {
369         while (Busy())
370         {
371             Stats stats;
372             //TODO: This call blocks so setting Busy() in another thread doesn't work here...
373             sbs = client->read(0, NULL, &stats);
374 
375             if (sbs->length() != 0)
376             {
377                 addDataToOutput(sbs, connIndex, &stats);
378             }
379             else // got zero bytes on read, nothing more will come
380                 goto Error;
381         }
382 
383         return;
384     }
385     catch (std::exception& e)
386     {
387         cerr << "DEC Caught EXCEPTION: " << e.what() << endl;
388         goto Error;
389     }
390     catch (...)
391     {
392         cerr << "DEC Caught UNKNOWN EXCEPT" << endl;
393         goto Error;
394     }
395 
396 Error:
397     // @bug 488 - error condition! push 0 length bs to messagequeuemap and
398     // eventually let jobstep error out.
399     boost::mutex::scoped_lock lk(fMlock);
400     MessageQueueMap::iterator map_tok;
401     sbs.reset(new ByteStream(0));
402 
403     for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
404     {
405         map_tok->second->queue.clear();
406         (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
407         map_tok->second->queue.push(sbs);
408     }
409     lk.unlock();
410 
411     if (fIsExeMgr)
412     {
413         //std::cout << "WARNING: DEC READ 0 LENGTH BS FROM "
414         //    << client->otherEnd()<< " OR GOT AN EXCEPTION READING" << std::endl;
415         decltype(pmCount) originalPMCount = pmCount;
416         // Re-establish if a remote PM restarted.
417         std::this_thread::sleep_for(std::chrono::seconds(3));
418         Setup();
419         if (originalPMCount != pmCount)
420         {
421             ostringstream os;
422             os << "DEC: lost connection to " << client->addr2String();
423             writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_ERROR);
424         }
425 
426 /*
427         // reset the pmconnection vector
428         ClientList tempConns;
429         boost::mutex::scoped_lock onErrLock(fOnErrMutex);
430         string moduleName = client->moduleName();
431         //cout << "moduleName=" << moduleName << endl;
432         for ( uint32_t i = 0; i < fPmConnections.size(); i++)
433         {
434             if (moduleName != fPmConnections[i]->moduleName())
435                 tempConns.push_back(fPmConnections[i]);
436             //else
437             //cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" << fPmConnections[i]->moduleName() << endl;
438         }
439 
440         if (tempConns.size() == fPmConnections.size()) return;
441 
442         fPmConnections.swap(tempConns);
443         pmCount = (pmCount == 0 ? 0 : pmCount - 1);
444         //cout << "PMCOUNT=" << pmCount << endl;
445 
446         // log it
447         ostringstream os;
448         os << "DEC: lost connection to " << client->addr2String();
449         writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL);
450 */
451     }
452     return;
453 }
454 
addQueue(uint32_t key,bool sendACKs)455 void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
456 {
457     bool b;
458 
459     boost::mutex* lock = new boost::mutex();
460     condition* cond = new condition();
461     boost::shared_ptr<MQE> mqe(new MQE(pmCount));
462 
463     mqe->queue = StepMsgQueue(lock, cond);
464     mqe->sendACKs = sendACKs;
465     mqe->throttled = false;
466 
467     boost::mutex::scoped_lock lk ( fMlock );
468     b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second;
469 
470     if (!b)
471     {
472         ostringstream os;
473         os << "DEC: attempt to add a queue with a duplicate ID " << key << endl;
474         throw runtime_error(os.str());
475     }
476 }
477 
removeQueue(uint32_t key)478 void DistributedEngineComm::removeQueue(uint32_t key)
479 {
480     boost::mutex::scoped_lock lk(fMlock);
481     MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
482 
483     if (map_tok == fSessionMessages.end())
484         return;
485 
486     map_tok->second->queue.shutdown();
487     map_tok->second->queue.clear();
488     fSessionMessages.erase(map_tok);
489 }
490 
shutdownQueue(uint32_t key)491 void DistributedEngineComm::shutdownQueue(uint32_t key)
492 {
493     boost::mutex::scoped_lock lk(fMlock);
494     MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
495 
496     if (map_tok == fSessionMessages.end())
497         return;
498 
499     map_tok->second->queue.shutdown();
500     map_tok->second->queue.clear();
501 }
502 
read(uint32_t key,SBS & bs)503 void DistributedEngineComm::read(uint32_t key, SBS& bs)
504 {
505     boost::shared_ptr<MQE> mqe;
506 
507     //Find the StepMsgQueueList for this session
508     boost::mutex::scoped_lock lk(fMlock);
509     MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
510 
511     if (map_tok == fSessionMessages.end())
512     {
513         ostringstream os;
514 
515         os << "DEC: attempt to read(bs) from a nonexistent queue\n";
516         throw runtime_error(os.str());
517     }
518 
519     mqe = map_tok->second;
520     lk.unlock();
521 
522     //this method can block: you can't hold any locks here...
523     TSQSize_t queueSize = mqe->queue.pop(&bs);
524 
525     if (bs && mqe->sendACKs)
526     {
527         boost::mutex::scoped_lock lk(ackLock);
528 
529         if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
530             setFlowControl(false, key, mqe);
531 
532         vector<SBS> v;
533         v.push_back(bs);
534         sendAcks(key, v, mqe, queueSize.size);
535     }
536 
537     if (!bs)
538         bs.reset(new ByteStream());
539 }
540 
read(uint32_t key)541 const ByteStream DistributedEngineComm::read(uint32_t key)
542 {
543     SBS sbs;
544     boost::shared_ptr<MQE> mqe;
545 
546     //Find the StepMsgQueueList for this session
547     boost::mutex::scoped_lock lk(fMlock);
548     MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
549 
550     if (map_tok == fSessionMessages.end())
551     {
552         ostringstream os;
553 
554         os << "DEC: read(): attempt to read from a nonexistent queue\n";
555         throw runtime_error(os.str());
556     }
557 
558     mqe = map_tok->second;
559     lk.unlock();
560 
561     TSQSize_t queueSize = mqe->queue.pop(&sbs);
562 
563     if (sbs && mqe->sendACKs)
564     {
565         boost::mutex::scoped_lock lk(ackLock);
566 
567         if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
568             setFlowControl(false, key, mqe);
569 
570         vector<SBS> v;
571         v.push_back(sbs);
572         sendAcks(key, v, mqe, queueSize.size);
573     }
574 
575     if (!sbs)
576         sbs.reset(new ByteStream());
577 
578     return *sbs;
579 }
580 
read_all(uint32_t key,vector<SBS> & v)581 void DistributedEngineComm::read_all(uint32_t key, vector<SBS>& v)
582 {
583     boost::shared_ptr<MQE> mqe;
584 
585     boost::mutex::scoped_lock lk(fMlock);
586     MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
587 
588     if (map_tok == fSessionMessages.end())
589     {
590         ostringstream os;
591         os << "DEC: read_all(): attempt to read from a nonexistent queue\n";
592         throw runtime_error(os.str());
593     }
594 
595     mqe = map_tok->second;
596     lk.unlock();
597 
598     mqe->queue.pop_all(v);
599 
600     if (mqe->sendACKs)
601     {
602         boost::mutex::scoped_lock lk(ackLock);
603         sendAcks(key, v, mqe, 0);
604     }
605 }
606 
read_some(uint32_t key,uint32_t divisor,vector<SBS> & v,bool * flowControlOn)607 void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector<SBS>& v,
608                                       bool* flowControlOn)
609 {
610     boost::shared_ptr<MQE> mqe;
611 
612     boost::mutex::scoped_lock lk(fMlock);
613     MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
614 
615     if (map_tok == fSessionMessages.end())
616     {
617         ostringstream os;
618 
619         os << "DEC: read_some(): attempt to read from a nonexistent queue\n";
620         throw runtime_error(os.str());
621     }
622 
623     mqe = map_tok->second;
624     lk.unlock();
625 
626     TSQSize_t queueSize = mqe->queue.pop_some(divisor, v, 1);   // need to play with the min #
627 
628     if (flowControlOn)
629         *flowControlOn = false;
630 
631     if (mqe->sendACKs)
632     {
633         boost::mutex::scoped_lock lk(ackLock);
634 
635         if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
636             setFlowControl(false, key, mqe);
637 
638         sendAcks(key, v, mqe, queueSize.size);
639 
640         if (flowControlOn)
641             *flowControlOn = mqe->throttled;
642     }
643 }
644 
sendAcks(uint32_t uniqueID,const vector<SBS> & msgs,boost::shared_ptr<MQE> mqe,size_t queueSize)645 void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
646                                      boost::shared_ptr<MQE> mqe, size_t queueSize)
647 {
648     ISMPacketHeader* ism;
649     uint32_t l_msgCount = msgs.size();
650 
651     /* If the current queue size > target, do nothing.
652      * If the original queue size > target, ACK the msgs below the target.
653      */
654     if (!mqe->throttled || queueSize >= mqe->targetQueueSize)
655     {
656         /* no acks will be sent, but update unackedwork to keep the #s accurate */
657         uint16_t numack = 0;
658         uint32_t sockidx = 0;
659 
660         while (l_msgCount > 0)
661         {
662             nextPMToACK(mqe, l_msgCount, &sockidx, &numack);
663             idbassert(numack <= l_msgCount);
664             l_msgCount -= numack;
665         }
666 
667         return;
668     }
669 
670     size_t totalMsgSize = 0;
671 
672     for (uint32_t i = 0; i < msgs.size(); i++)
673         totalMsgSize += msgs[i]->lengthWithHdrOverhead();
674 
675     if (queueSize + totalMsgSize > mqe->targetQueueSize)
676     {
677         /* update unackedwork for the overage that will never be acked */
678         int64_t overage = queueSize + totalMsgSize - mqe->targetQueueSize;
679         uint16_t numack = 0;
680         uint32_t sockidx = 0;
681         uint32_t msgsToIgnore;
682 
683         for (msgsToIgnore = 0; overage >= 0; msgsToIgnore++)
684             overage -= msgs[msgsToIgnore]->lengthWithHdrOverhead();
685 
686         if (overage < 0)
687             msgsToIgnore--;
688 
689         l_msgCount = msgs.size() - msgsToIgnore;  // this num gets acked
690 
691         while (msgsToIgnore > 0)
692         {
693             nextPMToACK(mqe, msgsToIgnore, &sockidx, &numack);
694             idbassert(numack <= msgsToIgnore);
695             msgsToIgnore -= numack;
696         }
697     }
698 
699     if (l_msgCount > 0)
700     {
701         ByteStream msg(sizeof(ISMPacketHeader));
702         uint16_t* toAck;
703         vector<bool> pmAcked(pmCount, false);
704 
705         ism = (ISMPacketHeader*) msg.getInputPtr();
706         // The only var checked by ReadThread is the Command var.  The others
707         // are wasted space.  We hijack the Size, & Flags fields for the
708         // params to the ACK msg.
709 
710         ism->Interleave = uniqueID;
711         ism->Command = BATCH_PRIMITIVE_ACK;
712         toAck = &ism->Size;
713 
714         msg.advanceInputPtr(sizeof(ISMPacketHeader));
715 
716         while (l_msgCount > 0)
717         {
718             /* could have to send up to pmCount ACKs */
719             uint32_t sockIndex = 0;
720 
721             /* This will reset the ACK field in the Bytestream directly, and nothing
722              * else needs to change if multiple msgs are sent. */
723             nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
724             idbassert(*toAck <= l_msgCount);
725             l_msgCount -= *toAck;
726             pmAcked[sockIndex] = true;
727             writeToClient(sockIndex, msg);
728         }
729 
730         // @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked.
731         // This is apply to the big message case only.  For small messages, the flow control is
732         // disabled when the queue size is below the disableThreshold.
733         if (mqe->hasBigMsgs)
734         {
735             uint64_t totalUnackedWork = 0;
736 
737             for (uint32_t i = 0; i < pmCount; i++)
738                 totalUnackedWork += mqe->unackedWork[i];
739 
740             if (totalUnackedWork == 0)
741             {
742                 *toAck = 1;
743 
744                 for (uint32_t i = 0; i < pmCount; i++)
745                 {
746                     if (!pmAcked[i])
747                         writeToClient(i, msg);
748                 }
749             }
750         }
751     }
752 }
753 
nextPMToACK(boost::shared_ptr<MQE> mqe,uint32_t maxAck,uint32_t * sockIndex,uint16_t * numToAck)754 void DistributedEngineComm::nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t maxAck,
755                                         uint32_t* sockIndex, uint16_t* numToAck)
756 {
757     uint32_t i;
758     uint32_t& nextIndex = mqe->ackSocketIndex;
759 
760     /* Other threads can be touching mqe->unackedWork at the same time, but because of
761      * the locking env, mqe->unackedWork can only grow; whatever gets latched in this fcn
762      * is a safe minimum at the point of use. */
763 
764     if (mqe->unackedWork[nextIndex] >= maxAck)
765     {
766         (void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], maxAck);
767         *sockIndex = nextIndex;
768         //FIXME: we're going to truncate here from 32 to 16 bits. Hopefully this will always fit...
769         *numToAck = maxAck;
770 
771         if (pmCount > 0)
772             nextIndex = (nextIndex + 1) % pmCount;
773 
774         return;
775     }
776     else
777     {
778         for (i = 0; i < pmCount; i++)
779         {
780             uint32_t curVal = mqe->unackedWork[nextIndex];
781             uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal);
782 
783             if (unackedWork > 0)
784             {
785                 (void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], unackedWork);
786                 *sockIndex = nextIndex;
787                 *numToAck = unackedWork;
788 
789                 if (pmCount > 0)
790                     nextIndex = (nextIndex + 1) % pmCount;
791 
792                 return;
793             }
794 
795             if (pmCount > 0)
796                 nextIndex = (nextIndex + 1) % pmCount;
797         }
798 
799         cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! ";
800 
801         for (i = 0; i < pmCount; i++)
802             cerr << mqe->unackedWork[i] << " ";
803 
804         cerr << " max: " << maxAck;
805         cerr << endl;
806         //make sure the returned vars are legitimate
807         *sockIndex = nextIndex;
808         *numToAck = maxAck / pmCount;
809 
810         if (pmCount > 0)
811             nextIndex = (nextIndex + 1) % pmCount;
812 
813         return;
814     }
815 }
816 
setFlowControl(bool enabled,uint32_t uniqueID,boost::shared_ptr<MQE> mqe)817 void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boost::shared_ptr<MQE> mqe)
818 {
819     mqe->throttled = enabled;
820     ByteStream msg(sizeof(ISMPacketHeader));
821     ISMPacketHeader* ism = (ISMPacketHeader*) msg.getInputPtr();
822 
823     ism->Interleave = uniqueID;
824     ism->Command = BATCH_PRIMITIVE_ACK;
825     ism->Size = (enabled ? 0 : -1);
826 
827 #ifdef VALGRIND
828     /* XXXPAT: For testing in valgrind, init the vars that don't get used */
829     ism->Flags = 0;
830     ism->Type = 0;
831     ism->MsgCount = 0;
832     ism->Status = 0;
833 #endif
834 
835     msg.advanceInputPtr(sizeof(ISMPacketHeader));
836 
837     for (uint32_t i = 0; i < mqe->pmCount; i++)
838         writeToClient(i, msg);
839 }
840 
write(uint32_t senderID,ByteStream & msg)841 void DistributedEngineComm::write(uint32_t senderID, ByteStream& msg)
842 {
843     ISMPacketHeader* ism = (ISMPacketHeader*) msg.buf();
844     uint32_t dest;
845     uint32_t numConn = fPmConnections.size();
846 
847     if (numConn > 0)
848     {
849         switch (ism->Command)
850         {
851             case BATCH_PRIMITIVE_CREATE:
852                 /* Disable flow control initially */
853                 msg << (uint32_t) - 1;
854 		    /* FALLTHRU */
855 
856             case BATCH_PRIMITIVE_DESTROY:
857             case BATCH_PRIMITIVE_ADD_JOINER:
858             case BATCH_PRIMITIVE_END_JOINER:
859             case BATCH_PRIMITIVE_ABORT:
860             case DICT_CREATE_EQUALITY_FILTER:
861             case DICT_DESTROY_EQUALITY_FILTER:
862                 /* XXXPAT: This relies on the assumption that the first pmCount "PMS*"
863                 entries in the config file point to unique PMs */
864                 uint32_t i;
865 
866                 for (i = 0; i < pmCount; i++)
867                     writeToClient(i, msg, senderID);
868 
869                 return;
870 
871             case BATCH_PRIMITIVE_RUN:
872             case DICT_TOKEN_BY_SCAN_COMPARE:
873                 // for efficiency, writeToClient() grabs the interleaving factor for the caller,
874                 // and decides the final connection index because it already grabs the
875                 // caller's queue information
876                 dest = ism->Interleave;
877                 writeToClient(dest, msg, senderID, true);
878                 break;
879 
880             default:
881                 idbassert_s(0, "Unknown message type");
882         }
883     }
884     else
885     {
886         writeToLog(__FILE__, __LINE__, "No PrimProcs are running", LOG_TYPE_DEBUG);
887         throw IDBExcept(ERR_NO_PRIMPROC);
888     }
889 }
890 
write(messageqcpp::ByteStream & msg,uint32_t connection)891 void DistributedEngineComm::write(messageqcpp::ByteStream& msg, uint32_t connection)
892 {
893     ISMPacketHeader* ism = (ISMPacketHeader*) msg.buf();
894     PrimitiveHeader* pm = (PrimitiveHeader*) (ism + 1);
895     uint32_t senderID = pm->UniqueID;
896 
897     boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t());
898     MessageQueueMap::iterator it;
899     // This keeps mqe's stats from being freed until end of function
900     boost::shared_ptr<MQE> mqe;
901     Stats* senderStats = NULL;
902 
903     lk.lock();
904     it = fSessionMessages.find(senderID);
905 
906     if (it != fSessionMessages.end())
907     {
908         mqe = it->second;
909         senderStats = &(mqe->stats);
910     }
911 
912     lk.unlock();
913 
914     newClients[connection]->write(msg, NULL, senderStats);
915 }
916 
StartClientListener(boost::shared_ptr<MessageQueueClient> cl,uint32_t connIndex)917 void DistributedEngineComm::StartClientListener(boost::shared_ptr<MessageQueueClient> cl, uint32_t connIndex)
918 {
919     boost::thread* thrd = new boost::thread(EngineCommRunner(this, cl, connIndex));
920     fPmReader.push_back(thrd);
921 }
922 
addDataToOutput(SBS sbs,uint32_t connIndex,Stats * stats)923 void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* stats)
924 {
925     ISMPacketHeader* hdr = (ISMPacketHeader*)(sbs->buf());
926     PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1);
927     uint32_t uniqueId = p->UniqueID;
928     boost::shared_ptr<MQE> mqe;
929 
930     boost::mutex::scoped_lock lk(fMlock);
931     MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
932 
933     if (map_tok == fSessionMessages.end())
934     {
935         // For debugging...
936         //cerr << "DistributedEngineComm::AddDataToOutput: tried to add a message to a dead session: " << uniqueId << ", size " << sbs->length() << ", step id " << p->StepID << endl;
937         return;
938     }
939 
940     mqe = map_tok->second;
941     lk.unlock();
942 
943     if (pmCount > 0)
944     {
945         (void)atomicops::atomicInc(&mqe->unackedWork[connIndex % pmCount]);
946     }
947 
948     TSQSize_t queueSize = mqe->queue.push(sbs);
949 
950     if (mqe->sendACKs)
951     {
952         boost::mutex::scoped_lock lk(ackLock);
953         uint64_t msgSize = sbs->lengthWithHdrOverhead();
954 
955         if (!mqe->throttled && msgSize > (targetRecvQueueSize / 2))
956             doHasBigMsgs(mqe, (300 * 1024 * 1024 > 3 * msgSize ?
957                                300 * 1024 * 1024 : 3 * msgSize)); //buffer at least 3 big msgs
958 
959         if (!mqe->throttled && queueSize.size >= mqe->targetQueueSize)
960             setFlowControl(true, uniqueId, mqe);
961     }
962 
963     if (stats)
964         mqe->stats.dataRecvd(stats->dataRecvd());
965 }
966 
doHasBigMsgs(boost::shared_ptr<MQE> mqe,uint64_t targetSize)967 void DistributedEngineComm::doHasBigMsgs(boost::shared_ptr<MQE> mqe, uint64_t targetSize)
968 {
969     mqe->hasBigMsgs = true;
970 
971     if (mqe->targetQueueSize < targetSize)
972         mqe->targetQueueSize = targetSize;
973 }
974 
writeToClient(size_t index,const ByteStream & bs,uint32_t sender,bool doInterleaving)975 int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uint32_t sender, bool doInterleaving)
976 {
977     boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t());
978     MessageQueueMap::iterator it;
979     // Keep mqe's stats from being freed early
980     boost::shared_ptr<MQE> mqe;
981     Stats* senderStats = NULL;
982     uint32_t interleaver = 0;
983 
984     if (fPmConnections.size() == 0)
985         return 0;
986 
987     if (sender != numeric_limits<uint32_t>::max())
988     {
989         lk.lock();
990         it = fSessionMessages.find(sender);
991 
992         if (it != fSessionMessages.end())
993         {
994             mqe = it->second;
995             senderStats = &(mqe->stats);
996 
997             if (doInterleaving)
998                 interleaver = it->second->interleaver[index % it->second->pmCount]++;
999         }
1000 
1001         lk.unlock();
1002     }
1003 
1004     try
1005     {
1006         if (doInterleaving)
1007             index = (index + (interleaver * pmCount)) % fPmConnections.size();
1008 
1009         ClientList::value_type client = fPmConnections[index];
1010 
1011         if (!client->isAvailable()) return 0;
1012 
1013         boost::mutex::scoped_lock lk(*(fWlock[index]));
1014         client->write(bs, NULL, senderStats);
1015         return 0;
1016     }
1017     catch (...)
1018     {
1019         // @bug 488. error out under such condition instead of re-trying other connection,
1020         // by pushing 0 size bytestream to messagequeue and throw exception
1021         SBS sbs;
1022         lk.lock();
1023         //std::cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << std::endl;
1024         MessageQueueMap::iterator map_tok;
1025         sbs.reset(new ByteStream(0));
1026 
1027         for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
1028         {
1029             map_tok->second->queue.clear();
1030             (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
1031             map_tok->second->queue.push(sbs);
1032         }
1033 
1034         lk.unlock();
1035         /*
1036         		// reconfig the connection array
1037         		ClientList tempConns;
1038         		{
1039         			//cout << "WARNING: DEC WRITE BROKEN PIPE " << fPmConnections[index]->otherEnd()<< endl;
1040         			boost::mutex::scoped_lock onErrLock(fOnErrMutex);
1041         			string moduleName = fPmConnections[index]->moduleName();
1042         			//cout << "module name = " << moduleName << endl;
1043         			if (index >= fPmConnections.size()) return 0;
1044 
1045         			for (uint32_t i = 0; i < fPmConnections.size(); i++)
1046         			{
1047         				if (moduleName != fPmConnections[i]->moduleName())
1048         					tempConns.push_back(fPmConnections[i]);
1049         			}
1050         			if (tempConns.size() == fPmConnections.size()) return 0;
1051         			fPmConnections.swap(tempConns);
1052         			pmCount = (pmCount == 0 ? 0 : pmCount - 1);
1053         		}
1054         // send alarm
1055         ALARMManager alarmMgr;
1056         string alarmItem("UNKNOWN");
1057 
1058         if (index < fPmConnections.size())
1059         {
1060             alarmItem = fPmConnections[index]->addr2String();
1061         }
1062 
1063         alarmItem.append(" PrimProc");
1064         alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET);
1065         */
1066         throw runtime_error("DistributedEngineComm::write: Broken Pipe error");
1067     }
1068 }
1069 
size(uint32_t key)1070 uint32_t DistributedEngineComm::size(uint32_t key)
1071 {
1072     boost::mutex::scoped_lock lk(fMlock);
1073     MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
1074 
1075     if (map_tok == fSessionMessages.end())
1076         throw runtime_error("DEC::size() attempt to get the size of a nonexistant queue!");
1077 
1078     boost::shared_ptr<MQE> mqe = map_tok->second;
1079     //TODO: should probably check that this is a valid iter...
1080     lk.unlock();
1081     return mqe->queue.size().count;
1082 }
1083 
addDECEventListener(DECEventListener * l)1084 void DistributedEngineComm::addDECEventListener(DECEventListener* l)
1085 {
1086     boost::mutex::scoped_lock lk(eventListenerLock);
1087     eventListeners.push_back(l);
1088 }
1089 
removeDECEventListener(DECEventListener * l)1090 void DistributedEngineComm::removeDECEventListener(DECEventListener* l)
1091 {
1092     boost::mutex::scoped_lock lk(eventListenerLock);
1093     std::vector<DECEventListener*> newListeners;
1094     uint32_t s = eventListeners.size();
1095 
1096     for (uint32_t i = 0; i < s; i++)
1097         if (eventListeners[i] != l)
1098             newListeners.push_back(eventListeners[i]);
1099 
1100     eventListeners.swap(newListeners);
1101 }
1102 
getNetworkStats(uint32_t uniqueID)1103 Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID)
1104 {
1105     boost::mutex::scoped_lock lk(fMlock);
1106     MessageQueueMap::iterator it;
1107     Stats empty;
1108 
1109     it = fSessionMessages.find(uniqueID);
1110 
1111     if (it != fSessionMessages.end())
1112         return it->second->stats;
1113 
1114     return empty;
1115 }
1116 
MQE(uint32_t pCount)1117 DistributedEngineComm::MQE::MQE(uint32_t pCount) : ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false),
1118     targetQueueSize(targetRecvQueueSize)
1119 {
1120     unackedWork.reset(new volatile uint32_t[pmCount]);
1121     interleaver.reset(new uint32_t[pmCount]);
1122     memset((void*) unackedWork.get(), 0, pmCount * sizeof(uint32_t));
1123     memset((void*) interleaver.get(), 0, pmCount * sizeof(uint32_t));
1124 }
1125 
1126 }
1127 // vim:ts=4 sw=4:
1128